Compare commits

...

2 Commits

Author SHA1 Message Date
serversdwn
154a11d057 Add s3_analyzer.py for live protocol analysis of Instantel MiniMate Plus RS-232
- Implement functionality to read and parse raw_s3.bin and raw_bw.bin files.
- Define protocol constants and mappings for various command and response identifiers.
- Create data structures for frames, sessions, and diffs to facilitate analysis.
- Develop functions for annotating frames, splitting sessions, and generating reports.
- Include live mode for continuous monitoring and reporting of protocol frames.
- Add command-line interface for user interaction and configuration.
2026-03-10 05:00:55 -04:00
serversdwn
faa869d03b doc: protocol ref updated to v0.20 2026-03-09 19:02:53 -04:00
5 changed files with 2383 additions and 136 deletions

24
.gitignore vendored
View File

@@ -1 +1,25 @@
/bridges/captures/ /bridges/captures/
# Python bytecode
__pycache__/
*.py[cod]
# Virtual environments
.venv/
venv/
env/
# Editor / OS
.vscode/
*.swp
.DS_Store
Thumbs.db
# Analyzer outputs
*.report
claude_export_*.md
# Frame database
*.db
*.db-wal
*.db-shm

View File

@@ -40,14 +40,16 @@
| 2026-03-02 | §14 Open Questions | `0x082A` hypothesis refined: 2090 decimal. At 1024 sps, 2 sec record = 2048 samples. Possible that 0x082A = total samples including 0.25s pre-trigger (256 samples) at some adjusted rate. Needs capture with different record time. | | 2026-03-02 | §14 Open Questions | `0x082A` hypothesis refined: 2090 decimal. At 1024 sps, 2 sec record = 2048 samples. Possible that 0x082A = total samples including 0.25s pre-trigger (256 samples) at some adjusted rate. Needs capture with different record time. |
| 2026-03-02 | §14 Open Questions | **NEW items added:** Trigger sample width (default=2), Auto Window (1-9 sec), Aux Trigger (enabled/disabled) — all confirmed settings from operator manual not yet mapped in protocol. | | 2026-03-02 | §14 Open Questions | **NEW items added:** Trigger sample width (default=2), Auto Window (1-9 sec), Aux Trigger (enabled/disabled) — all confirmed settings from operator manual not yet mapped in protocol. |
| 2026-03-02 | §14 Open Questions | Monitoring LCD Cycle resolved — removed from open questions. | | 2026-03-02 | §14 Open Questions | Monitoring LCD Cycle resolved — removed from open questions. |
| 2026-03-03 | §2 Frame Structure | **UPDATED:** Documented BW/S3 framing asymmetry. BW uses bare STX (`0x02`); S3 uses DLE+STX (`0x10 0x02`). ETX initially believed symmetric — see correction below. |
| 2026-03-03 | §2 Frame Structure | **CORRECTED:** ETX is also asymmetric. BW uses bare ETX (`0x03`); S3 uses DLE+ETX (`0x10 0x03`). Confirmed via checksum validation: 91/98 BW frames pass with bare `0x03` as terminator. All `10 03` sequences in `raw_bw.bin` are in-payload data, never followed by `41 02` (next frame start). Full confirmed grammar: BW=`02`...`03`, S3=`10 02`...`10 03`. Both sides stuff literal `0x10` as `10 10`. This is the formally confirmed link-layer grammar. |
| 2026-03-03 | Appendix A | **CORRECTED:** Previous entry stated logger strips DLE from ETX. This was wrong — applied to older logger only. `s3_bridge v0.5.0+` is lossless. Section rewritten to reflect current flat raw dump format. |
| 2026-03-02 | Appendix A | **CORRECTED:** Previous entry stated logger strips DLE from ETX. This was wrong — it applied to an older logger version. `s3_bridge v0.5.0` confirmed to preserve raw wire bytes including `0x10 0x03` intact. HxD inspection of new capture confirmed `10 03` present in S3→BW record payloads. | | 2026-03-02 | Appendix A | **CORRECTED:** Previous entry stated logger strips DLE from ETX. This was wrong — it applied to an older logger version. `s3_bridge v0.5.0` confirmed to preserve raw wire bytes including `0x10 0x03` intact. HxD inspection of new capture confirmed `10 03` present in S3→BW record payloads. |
| 2026-03-02 | Appendix A | **UPDATED:** New capture architecture: two flat raw wire dumps per session (`raw_s3.bin`, `raw_bw.bin`), one per direction, no record wrapper. Replaces structured `.bin` format for parser input. | | 2026-03-02 | Appendix A | **UPDATED:** New capture architecture: two flat raw wire dumps per session (`raw_s3.bin`, `raw_bw.bin`), one per direction, no record wrapper. Replaces structured `.bin` format for parser input. |
| 2026-03-02 | Appendix A | **PARSER:** Deterministic DLE state machine implemented (`s3_parser.py`). Three states: `IDLE → IN_FRAME → AFTER_DLE`. Replaces heuristic global scanning. Properly handles DLE stuffing (`10 10` → literal `10`). Only complete STX→ETX pairs counted as frames. | | 2026-03-02 | Appendix A | **PARSER:** Deterministic DLE state machine implemented (`s3_parser.py`). Three states: `IDLE → IN_FRAME → AFTER_DLE`. Replaces heuristic global scanning. Properly handles DLE stuffing (`10 10` → literal `10`). Only complete STX→ETX pairs counted as frames. |
| 2026-03-02 | Appendix A | **VALIDATED:** `raw_bw.bin` yields 7 complete frames via state machine. `raw_s3.bin` contains large structured responses (first frame payload ~3922 bytes). Both files confirmed lossless. BW bare `0x02` pattern confirmed as asymmetric framing (BW sends bare STX, S3 sends DLE+STX). | | 2026-03-02 | Appendix A | **VALIDATED:** `raw_bw.bin` yields 7 complete frames via state machine. `raw_s3.bin` contains large structured responses (first frame payload ~3922 bytes). Both files confirmed lossless. BW bare `0x02` pattern confirmed as asymmetric framing (BW sends bare STX, S3 sends DLE+STX). |
| 2026-03-03 | §2, §10, Appendix C | **MILESTONE — Link-layer grammar formally confirmed.** BW start marker is `41 02` (ACK+STX as a unit — bare `02` alone is not sufficient). BW frame boundary is structural sequence `03 41 02`. ETX lookahead: bare `03` only accepted as ETX when followed by `41 02` or at EOF. Checksum confirmed split: small frames use SUM8, large frames use unknown algorithm. Checksum is semantic, not a framing primitive. `s3_parser.py` v0.2.2 implements dual `--mode s3/bw`. | | 2026-03-09 | §7.6, §Appendix B | **CONFIRMED:** Record time located in SUB E5 data page2 at payload offset `+0x28` as **float32 BE**. Confirmed via two controlled captures: 7 sec = `40 E0 00 00`, 13 sec = `41 50 00 00`. Geo range (only 1.25 or 10.0 in/s) eliminates ambiguity — 7 and 13 are not valid geo range values. |
| 2026-03-09 | §7.5, §14 | **CORRECTED:** The byte `0x0A` appearing after the "Extended Notes" null-padded label in the E5 payload is **NOT** record time. It is an unknown field that equals 10 and does not change when record time changes. False lead closed. |
| 2026-03-09 | §14 | **RESOLVED:** `0x082A` mystery closed — confirmed as fixed-size E5 payload length (2090 bytes), not a record-time-derived sample count. Value is constant regardless of record time or other settings. |
| 2026-03-09 | §7.8, §14, Appendix B | **NEW — Trigger Sample Width confirmed:** Located in BW→S3 write frame SUB `0x82`, destuffed payload offset `[22]`, uint8. Confirmed via BW-side capture (`raw_bw.bin`) diffing two sessions: Width=4 → `0x04`, Width=3 → `0x03`. Setting is **transmitted only on BW→S3 write** (SUB `0x82`), invisible in S3-side compliance dumps. |
| 2026-03-09 | §14, Appendix B | **CONFIRMED — Mode gating is a real protocol behavior:** Several settings are only transmitted (and possibly only interpreted by the device) when the required mode is active. Trigger Sample Width is only sent when in Compliance/Single-Shot/Fixed Record Time mode. Auto Window is only relevant when Record Stop Mode = Auto — attempting to capture it in Fixed mode produced no change on the wire (F7 and D1 blocks identical before/after). This is an architectural property, not a gap in the capture methodology. Future capture attempts for mode-gated settings must first activate the appropriate mode. |
| 2026-03-09 | §14 | **UPDATED — Auto Window:** Capture attempted (Auto Window 3→9) in Fixed record time mode. No change observed in any S3-side frame (F7, D1, E5 all identical). Confirmed mode-gated behind Record Stop Mode = Auto. Not capturable without switching modes — deferred. |
--- ---
@@ -66,77 +68,38 @@
## 2. Frame Structure ## 2. Frame Structure
> ⚠️ **2026-02-26 — CORRECTED:** Previous version incorrectly identified `0x41` as STX and `0x02`/`0x03` as bare frame delimiters. The protocol uses proper **DLE framing**. See below. > ⚠️ **2026-02-26 — CORRECTED:** Previous version incorrectly identified `0x41` as STX and `0x02`/`0x03` as bare frame delimiters. The protocol uses proper **DLE framing**. See below.
> ⚠️ **2026-03-03 — UPDATED:** Frame start AND end are both asymmetric by direction. See confirmed grammar below.
### Confirmed Link-Layer Grammar ✅ CONFIRMED — 2026-03-03 Every message follows this structure:
The two sides of the connection use **fully asymmetric framing**. DLE stuffing applies on both sides.
| Direction | Start marker | End marker | Stuffing | Notes |
|---|---|---|---|---|
| S3 → BW (device) | `10 02` (DLE+STX) | `10 03` (DLE+ETX) | `10``10 10` | Full DLE framing |
| BW → S3 (Blastware) | `41 02` (ACK+STX) | `03` (bare ETX) | `10``10 10` | ACK is part of start marker |
**BW start marker:** `41 02` is treated as a single two-byte start signature. Bare `02` alone is **not** sufficient to start a BW frame — it must be preceded by `41` (ACK). This prevents false triggering on `02` bytes inside payload data.
**BW ETX rule:** Bare `03` is accepted as frame end **only** when followed by `41 02` (next frame's ACK+STX) or at EOF. The structural boundary pattern is:
```
... [payload] 03 41 02 [next payload] ...
```
In-payload `03` bytes are preserved as data when not followed by `41 02`.
**Evidence:**
- 98/98 BW frames extracted from `raw_bw.bin` using `41 02` start + `03 41 02` structural boundary
- 91/98 small BW frames validate SUM8 checksum; 7 large config/write frames do not match any known checksum algorithm
- All `10 03` sequences in `raw_bw.bin` confirmed as in-payload data (none followed by `41 02`)
- `s3_parser.py v0.2.2` implements both modes; BW ETX lookahead confirmed working
**Checksum is NOT a framing primitive:**
- Small frames (e.g. keepalive SUB `5B`): SUM8 validates consistently
- Large frames (e.g. SUB `71` config writes): checksum algorithm unknown — does not match SUM8, CRC-16/IBM, CRC-16/CCITT-FALSE, or CRC-16/X25
- Frame boundaries are determined structurally; checksum validation is a semantic-layer concern only
### Frame Structure by Direction
**S3 → BW (device responses):**
``` ```
[ACK] [DLE+STX] [PAYLOAD...] [CHECKSUM] [DLE+ETX] [ACK] [DLE+STX] [PAYLOAD...] [CHECKSUM] [DLE+ETX]
0x41 0x10 0x02 N bytes 1 byte 0x10 0x03 0x41 0x10 0x02 N bytes 1 byte 0x10 0x03
``` ```
**BW → S3 (Blastware commands):**
```
[ACK] [STX] [PAYLOAD...] [CHECKSUM] [ETX]
0x41 0x02 N bytes 1 byte 0x03
```
### Special Byte Definitions ### Special Byte Definitions
| Token | Raw Bytes | Meaning | Certainty | | Token | Raw Bytes | Meaning | Certainty |
|---|---|---|---| |---|---|---|---|
| ACK | `0x41` (ASCII `'A'`) | Acknowledgment / ready token. Standalone single byte. Sent before every frame by both sides. | ✅ CONFIRMED | | ACK | `0x41` (ASCII `'A'`) | Acknowledgment / ready token. Standalone single byte. Sent before every frame by both sides. | ✅ CONFIRMED |
| DLE | `0x10` | Data Link Escape. Used for stuffing on both sides; also prefixes STX/ETX on S3 side only. | ✅ CONFIRMED — 2026-02-26 | | DLE | `0x10` | Data Link Escape. Prefixes the next byte to give it special meaning. | ✅ CONFIRMED — 2026-02-26 |
| STX (S3) | `0x10 0x02` | DLE+STX = Start of frame sent by device | ✅ CONFIRMED — 2026-02-26 | | STX | `0x10 0x02` | DLE+STX = Start of frame (two-byte sequence) | ✅ CONFIRMED — 2026-02-26 |
| STX (BW) | `0x02` | Bare STX = Start of frame sent by Blastware | ✅ CONFIRMED — 2026-03-03 | | ETX | `0x10 0x03` | DLE+ETX = End of frame (two-byte sequence) | ✅ CONFIRMED — 2026-02-26 |
| ETX (S3) | `0x10 0x03` | DLE+ETX = End of frame sent by device | ✅ CONFIRMED — 2026-02-26 | | CHECKSUM | 1 byte | 8-bit sum of de-stuffed payload bytes, modulo 256. Sits between payload and DLE+ETX. | ✅ CONFIRMED |
| ETX (BW) | `0x03` | Bare ETX = End of frame sent by Blastware | ✅ CONFIRMED — 2026-03-03 |
| CHECKSUM | 1 byte | 8-bit sum of de-stuffed payload bytes, modulo 256. Sits between payload and ETX. | ✅ CONFIRMED |
### DLE Byte Stuffing Rule ### DLE Byte Stuffing Rule
> ✅ CONFIRMED — 2026-02-26 > ✅ CONFIRMED — 2026-02-26
Any `0x10` byte appearing **naturally in the payload data** is escaped by doubling it: `0x10``0x10 0x10`. This applies on **both sides** of the connection. Any `0x10` byte appearing **naturally in the payload data** is escaped by doubling it: `0x10``0x10 0x10`. This prevents the parser from confusing real data with frame control sequences.
- **Transmit:** Replace every `0x10` in payload with `0x10 0x10` - **Transmit:** Replace every `0x10` in payload with `0x10 0x10`
- **Receive:** Replace every `0x10 0x10` in the frame body with a single `0x10` - **Receive:** Replace every `0x10 0x10` in the frame body with a single `0x10`
| Sequence on wire | S3 context | BW context | | Sequence on wire | Meaning |
|---|---|---| |---|---|
| `0x10 0x02` | Frame START | Stuffed `0x10` + payload `0x02` | | `0x10 0x02` | Frame START — only valid at beginning |
| `0x10 0x03` | Frame END | Stuffed `0x10` + payload `0x03` | | `0x10 0x03` | Frame END |
| `0x10 0x10` | Escaped literal `0x10` | Escaped literal `0x10` | | `0x10 0x10` | Escaped literal `0x10` byte in payload data |
| `0x02` | Payload byte | Frame START | | Any other `0x10 0xXX` | Protocol error / undefined |
| `0x03` | Payload byte | Frame END |
### Frame Parser Notes ### Frame Parser Notes
@@ -498,7 +461,23 @@ The SUB `1A` read response (`E5`) and SUB `71` write block contain per-channel t
Values are stored natively in **imperial units (in/s)** — unit strings `"in."` and `"/s"` embedded inline confirm this regardless of display locale. Values are stored natively in **imperial units (in/s)** — unit strings `"in."` and `"/s"` embedded inline confirm this regardless of display locale.
> ❓ **`0x082A` (= 2090)** — appears in the same block but did not change when trigger or alarm level was adjusted. Possibly record time, sample count, or a different threshold. Needs a targeted capture changing a known integer field to identify. ### 7.6.1 Record Time (SUB E5 data page2 `+0x28`)
> ✅ **CONFIRMED — 2026-03-09** from two controlled captures (record time 7→13 sec, raw_s3-3-9-26_2.bin and raw_s3-3-9-26_3.bin).
Record time is stored as a **32-bit IEEE 754 float, big-endian** at offset `+0x28` from the start of the E5 data page2 payload.
| Record Time | float32 BE bytes | Decoded |
|---|---|---|
| 7 seconds | `40 E0 00 00` | 7.0 |
| 10 seconds | `41 20 00 00` | 10.0 |
| 13 seconds | `41 50 00 00` | 13.0 |
**Disambiguation note:** Max geo range (also a float in this block) only takes values 1.25 or 10.0 in/s. The values 7 and 13 are not valid geo range selections, confirming this field is record time.
**`0x0A` after "Extended Notes" label:** The byte `0x0A` that appears after the null-padded "Extended Notes" string in the E5 payload is **not** record time. It is an unknown field that equals 10 and is invariant across record time changes. Do not confuse it with the record time float at `+0x28`.
> ✅ **`0x082A` (= 2090) — RESOLVED:** This value is the fixed payload length of the E5 response block (2090 bytes). It is constant regardless of record time, trigger level, or any other setting. It appears in the E5 frame header as the declared data length for the paged read, not as a settings field.
--- ---
@@ -561,6 +540,71 @@ offset size type value (Tran example) meaning
--- ---
### 7.8 Trigger / Advanced Config Write Frame (BW→S3 SUB `0x82`)
> ✅ **CONFIRMED — 2026-03-09** from controlled BW-side capture diff (Trigger Sample Width 4→3).
SUB `0x82` is the BW→S3 write command for the advanced trigger configuration block. It is the write counterpart to the S3→BW read response SUB `0xD1` (0xFF 0x82 = 0x7D is a separate sub; the D1/2E read pair is distinct). The `0x82` write frame is only visible in `raw_bw.bin` — it does not appear in S3-side compliance dumps.
**Destuffed BW write frame layout (47 raw bytes → 46 destuffed):**
```
offset value meaning
[00] 0x10 addr (literal 0x10 after destuffing)
[01] 0x00 unknown
[02] 0x82 SUB: advanced config write
[03] 0x00 unknown
[04] 0x00 unknown
[05] 0x1C length = 28 bytes (payload size)
[06..10] 00.. header/padding
[11..16] 00.. header/padding
[17] 0x1A unknown (constant 26 = 0x1A)
[18] 0xD5 unknown (constant)
[19] 0x00 unknown
[20] 0x00 unknown
[21] 0x10 literal 0x10 (stuffed in raw frame as 10 10)
[22] 0x04/0x03 Trigger Sample Width ✅ CONFIRMED (uint8, samples)
[23] 0x0A unknown (constant 10; NOT Auto Window)
[24..43] 0xFF.. padding
[44] 0x00 unknown
[45] checksum
```
**Confirmed Trigger Sample Width values:**
| Width setting | Byte [22] |
|---|---|
| 4 samples | `0x04` |
| 3 samples | `0x03` |
| 2 samples (default) | `0x02` (expected — not yet captured) |
**Known constants in this frame:** `[17]=0x1A`, `[18]=0xD5`, `[23]=0x0A`. These do not change with Trigger Sample Width changes. Byte `[23]` = 10 was initially a candidate for Auto Window (range 19) but cannot be Auto Window because 10 is outside the valid range.
**Mode gating:** This write frame is only transmitted when Blastware performs a Send To Unit operation in Compliance / Single-Shot / Fixed Record Time mode. The frame is absent from other session types.
---
### 7.9 Mode Gating — Protocol Architecture Note
> ✅ **CONFIRMED — 2026-03-09** from controlled captures and null-change experiments.
Several settings are **mode-gated**: the device only transmits (reads) or accepts (writes) certain fields when the appropriate operating mode is active. This is an architectural property of the protocol, not a gap in capture methodology.
**Observed mode gating:**
| Setting | Gate Condition | Evidence |
|---|---|---|
| Trigger Sample Width | Compliance / Single-Shot / Fixed Record Time mode | Not visible in S3-side reads; only in BW write frame (SUB `0x82`) when mode is active |
| Auto Window | Record Stop Mode = Auto | Capture of 3→9 change in Fixed mode produced zero wire change in all frames (F7, D1, E5 all identical) |
**Implication for captures:** To map a mode-gated setting, you must first activate the gating mode on the device, then perform the compliance dump or write capture. Changing the setting value while in the wrong mode will produce no observable wire change.
**Suspected mode-gated settings not yet captured:**
- Auto Window (requires Record Stop Mode = Auto)
- Auxiliary Trigger (unknown gate condition)
---
### 7.5 Full Waveform Record (SUB F3) — 0xD2 bytes × 2 pages ### 7.5 Full Waveform Record (SUB F3) — 0xD2 bytes × 2 pages
Peak values as IEEE 754 big-endian floats (restored section header): Peak values as IEEE 754 big-endian floats (restored section header):
@@ -638,17 +682,14 @@ Timestamps are 6-byte sequences appearing in event headers and waveform keys.
## 10. DLE Byte Stuffing ## 10. DLE Byte Stuffing
> ✅ **CONFIRMED — 2026-02-26** (previously ❓ SPECULATIVE) > ✅ **CONFIRMED — 2026-02-26** (previously ❓ SPECULATIVE)
This protocol uses standard **DLE (Data Link Escape) byte stuffing**, a classical technique used in protocols like IBM BISYNC dating to the 1970s. Both sides stuff literal `0x10` bytes as `0x10 0x10`. The framing delimiters differ by direction — see §2. This protocol uses standard **DLE (Data Link Escape) byte stuffing**, a classical technique used in protocols like IBM BISYNC dating to the 1970s.
### Parser State Machine — S3→BW direction (device responses) ### Parser State Machine
Trigger on DLE+STX, terminate on DLE+ETX.
``` ```
IDLE: IDLE:
receive 0x41 → emit ACK event, stay IDLE receive 0x41 → emit ACK event, stay IDLE
receive 0x10 → goto WAIT_STX receive 0x10 → goto WAIT_STX
receive anything → discard, stay IDLE
WAIT_STX: WAIT_STX:
receive 0x02 → frame started, goto IN_FRAME receive 0x02 → frame started, goto IN_FRAME
@@ -661,32 +702,10 @@ IN_FRAME:
ESCAPE: ESCAPE:
receive 0x03 → frame complete — validate checksum, process buffer, goto IDLE receive 0x03 → frame complete — validate checksum, process buffer, goto IDLE
receive 0x10 → append single 0x10 to buffer, goto IN_FRAME (stuffed literal) receive 0x10 → append single 0x10 to buffer, goto IN_FRAME (stuffed literal)
receive anything → append DLE + byte to buffer (recovery), goto IN_FRAME receive 0x02 → error (nested STX), goto IDLE
receive anything → error, goto IDLE
``` ```
### Parser State Machine — BW→S3 direction (Blastware commands)
Trigger on `41 02` (ACK+STX as a unit). ETX accepted only when followed by `41 02` or at EOF.
```
IDLE:
receive 0x41 + next==0x02 → frame started (consume both), goto IN_FRAME
receive anything → discard, stay IDLE
IN_FRAME:
receive 0x10 → goto ESCAPE
receive 0x03 + lookahead==0x41 0x02, or EOF
→ frame complete — validate checksum, process buffer, goto IDLE
receive 0x03 (no lookahead) → append to buffer (in-payload 03), stay IN_FRAME
receive any byte → append to buffer, stay IN_FRAME
ESCAPE:
receive 0x10 → append single 0x10 to buffer, goto IN_FRAME (stuffed literal)
receive anything → append DLE + byte to buffer (recovery), goto IN_FRAME
```
**Architectural note:** Checksum validation is optional and informational only. Frame boundaries are determined structurally via the `03 41 02` sequence — never by checksum gating.
--- ---
## 11. Checksum Reference Implementation ## 11. Checksum Reference Implementation
@@ -825,25 +844,23 @@ Build in this order — each step is independently testable:
--- ---
## Appendix A — s3_bridge Capture Format ## Appendix A — s3_bridge Capture Format
> ⚠️ **This section describes tooling behavior, not protocol semantics.** > **CONFIRMED — 2026-02-26**
> **2026-03-03 — CORRECTED:** Previous version of this section incorrectly stated that `s3_bridge` strips DLE from ETX. This applied to an older logger version only. `s3_bridge v0.5.0+` is confirmed lossless. See Appendix C for full validation details.
### Current Format (v0.5.0+) ✅ CONFIRMED — 2026-03-03 > ⚠️ **This behavior is not part of the Instantel protocol. It is an artifact of the bridge logger implementation.**
As of `s3_bridge v0.5.0`, captures are produced as **two flat raw wire dump files per session**: The `.bin` files produced by `s3_bridge` are **not raw wire bytes**. The logger makes one modification:
| File | Contents | | Wire sequence | In .bin file | Notes |
|---|---| |---|---|---|
| `raw_s3.bin` | All bytes transmitted by S3 (device → Blastware), in order | | `0x10 0x03` (DLE+ETX) | `0x03` | DLE stripped from end-of-frame marker |
| `raw_bw.bin` | All bytes transmitted by BW (Blastware → device), in order | | All other bytes | Unchanged | ACK, DLE+STX, stuffed payload, checksum all preserved verbatim |
Every byte on the wire is written verbatim — no modification, no record headers, no timestamps. `0x10 0x03` (DLE+ETX) is preserved intact. **Practical impact for parsing `.bin` files:**
- Frame end: scan for bare `0x03` (not `0x10 0x03`)
- Checksum: the byte immediately before the bare `0x03` is the checksum
- Everything else (ACK detection, DLE+STX, payload de-stuffing) works as documented in §10
**Practical impact for parsing:** > ⚠️ This means checksums cannot be verified on frames where the stuffed payload ends in `0x10` — that trailing `0x10` would normally be the DLE prefix of ETX, but the logger strips it, making the frame boundary ambiguous in that edge case. In practice this has not been observed in captured data.
- `raw_s3.bin`: trigger on `10 02`, terminate on `10 03` (state-machine-aware)
- `raw_bw.bin`: trigger on `41 02` (ACK+STX as a unit), terminate on `03` only when followed by `41 02` or at EOF
- Both: handle `10 10` → literal `10` unstuffing
- Use `s3_parser.py --mode s3` and `--mode bw` respectively
--- ---
@@ -851,7 +868,6 @@ Every byte on the wire is written verbatim — no modification, no record header
| Question | Priority | Added | Notes | | Question | Priority | Added | Notes |
|---|---|---|---| |---|---|---|---|
| **Large BW frame checksum algorithm** — Small frames (SUB `5B` keepalive etc.) validate with SUM8. Large config/write frames (SUB `71`, `68`, `69` etc.) do not match SUM8, CRC-16/IBM, CRC-16/CCITT-FALSE, or CRC-16/X25 in either endianness. Unknown whether it covers full payload or excludes header bytes, or whether different SUB types use different algorithms. | MEDIUM | 2026-03-03 | NEW |
| Byte at timestamp offset 3 — hours, minutes, or padding? | MEDIUM | 2026-02-26 | | | Byte at timestamp offset 3 — hours, minutes, or padding? | MEDIUM | 2026-02-26 | |
| `trail[0]` in serial number response — unit-specific byte, derivation unknown. `trail[1]` resolved as firmware minor version. | MEDIUM | 2026-02-26 | | | `trail[0]` in serial number response — unit-specific byte, derivation unknown. `trail[1]` resolved as firmware minor version. | MEDIUM | 2026-02-26 | |
| Full channel ID mapping in SUB `5A` stream (01/02/03/04 → which sensor?) | MEDIUM | 2026-02-26 | | | Full channel ID mapping in SUB `5A` stream (01/02/03/04 → which sensor?) | MEDIUM | 2026-02-26 | |
@@ -859,13 +875,14 @@ Every byte on the wire is written verbatim — no modification, no record header
| Purpose of SUB `09` / response `F6` — 202-byte read block | MEDIUM | 2026-02-26 | | | Purpose of SUB `09` / response `F6` — 202-byte read block | MEDIUM | 2026-02-26 | |
| Purpose of SUB `2E` / response `D1` — 26-byte read block | MEDIUM | 2026-02-26 | | | Purpose of SUB `2E` / response `D1` — 26-byte read block | MEDIUM | 2026-02-26 | |
| Full field mapping of SUB `1A` / response `E5` — channel scaling / compliance config block | MEDIUM | 2026-02-26 | | | Full field mapping of SUB `1A` / response `E5` — channel scaling / compliance config block | MEDIUM | 2026-02-26 | |
| `0x082A` in channel config block — not trigger, alarm, or record time directly. **Hypothesis:** total sample count at 1024 sps: 2 sec record + 0.25 pre-trigger = 2.25 sec × ~930 sps? Or encoded differently. Capture with different record time needed. | MEDIUM | 2026-03-01 | Updated 2026-03-02 | | `0x082A` in channel config block — not trigger, alarm, or record time directly. **RESOLVED: fixed E5 payload length (2090 bytes).** Constant regardless of all settings. | RESOLVED | 2026-03-01 | Resolved 2026-03-09 |
| **Record time in wire protocol** — float32 BE at E5 data page2 `+0x28`. **RESOLVED.** See §7.6.1. | RESOLVED | 2026-03-09 | Confirmed via 7→13 sec captures |
| Unknown uint16 fields at channel block +0A (=80), +0C (=15), +0E (=40), +10 (=21) — manual describes "Sensitive (Gain=8) / Normal (Gain=1)" per-channel range; 80/15/40/21 might encode gain, sensitivity, or ADC config. | LOW | 2026-03-01 | | | Unknown uint16 fields at channel block +0A (=80), +0C (=15), +0E (=40), +10 (=21) — manual describes "Sensitive (Gain=8) / Normal (Gain=1)" per-channel range; 80/15/40/21 might encode gain, sensitivity, or ADC config. | LOW | 2026-03-01 | |
| Full trigger configuration field mapping (SUB `1C` / write `82`) | LOW | 2026-02-26 | | | Full trigger configuration field mapping (SUB `1C` / write `82`) | LOW | 2026-02-26 | |
| Whether SUB `24`/`25` are distinct from SUB `5A` or redundant | LOW | 2026-02-26 | | | Whether SUB `24`/`25` are distinct from SUB `5A` or redundant | LOW | 2026-02-26 | |
| Meaning of `0x07 E7` field in config block | LOW | 2026-02-26 | | | Meaning of `0x07 E7` field in config block | LOW | 2026-02-26 | |
| **Trigger Sample Width**setting confirmed in manual (default=2 samples, §3.13.1h). Location in protocol not yet mapped. | LOW | 2026-03-02 | NEW | | **Trigger Sample Width****RESOLVED:** BW→S3 write frame SUB `0x82`, destuffed payload offset `[22]`, uint8. Width=4 → `0x04`, Width=3 → `0x03`. Confirmed via BW-side capture diff. Only visible in `raw_bw.bin` write traffic, not in S3-side compliance reads. | RESOLVED | 2026-03-02 | Confirmed 2026-03-09 |
| **Auto Window** — "1 to 9 seconds" per manual (§3.13.1b). Location in protocol not yet mapped. | LOW | 2026-03-02 | NEW | | **Auto Window** — "1 to 9 seconds" per manual (§3.13.1b). **Mode-gated:** only transmitted/active when Record Stop Mode = Auto. Capture attempted in Fixed mode (3→9 change) — no wire change observed in any frame. Deferred pending mode switch. | LOW | 2026-03-02 | Updated 2026-03-09 |
| **Auxiliary Trigger** — Enabled/Disabled per manual (§3.13.1d). Location in protocol not yet mapped. | LOW | 2026-03-02 | NEW | | **Auxiliary Trigger** — Enabled/Disabled per manual (§3.13.1d). Location in protocol not yet mapped. | LOW | 2026-03-02 | NEW |
| **Max Geo Range float 6.2061 in/s** — NOT a user-selectable range (manual only shows 1.25 and 10.0 in/s). Likely internal ADC full-scale constant or hardware range ceiling. Not worth capturing. | LOW | 2026-02-26 | Downgraded 2026-03-02 | | **Max Geo Range float 6.2061 in/s** — NOT a user-selectable range (manual only shows 1.25 and 10.0 in/s). Likely internal ADC full-scale constant or hardware range ceiling. Not worth capturing. | LOW | 2026-02-26 | Downgraded 2026-03-02 |
| MicL channel units — **RESOLVED: psi**, confirmed from `.set` file unit string `"psi\0"` | RESOLVED | 2026-03-01 | | | MicL channel units — **RESOLVED: psi**, confirmed from `.set` file unit string `"psi\0"` | RESOLVED | 2026-03-01 | |
@@ -890,13 +907,13 @@ Every byte on the wire is written verbatim — no modification, no record header
| Alarm Level (Geo) | §3.9.9 | Channel block, float | float32 BE | higher than trigger level | | Alarm Level (Geo) | §3.9.9 | Channel block, float | float32 BE | higher than trigger level |
| Trigger Level (Mic) | §3.8.6 | Channel block, float | float32 BE | 100148 dB in 1 dB steps | | Trigger Level (Mic) | §3.8.6 | Channel block, float | float32 BE | 100148 dB in 1 dB steps |
| Alarm Level (Mic) | §3.9.10 | Channel block, float | float32 BE | higher than mic trigger | | Alarm Level (Mic) | §3.9.10 | Channel block, float | float32 BE | higher than mic trigger |
| Record Time | §3.8.9 | `.set` +16 confirmed | uint32 | 1500 seconds | | Record Time | §3.8.9 | E5 data page2 `+0x28` (wire); `.set` +16 (file) | float32 BE (wire); uint32 LE (file) | 1105 seconds (menu label `<105`); confirmed 7→`40E00000`, 10→`41200000`, 13→`41500000` |
| Max Geo Range | §3.8.4 | Channel block, float | float32 BE | 1.25 or 10.0 in/s (user); 6.2061 in protocol = internal constant | | Max Geo Range | §3.8.4 | Channel block, float | float32 BE | 1.25 or 10.0 in/s (user); 6.2061 in protocol = internal constant |
| Microphone Units | §3.9.7 | Inline unit string | char[4] | `"psi\0"`, `"pa.\0"`, `"dB\0\0"` | | Microphone Units | §3.9.7 | Inline unit string | char[4] | `"psi\0"`, `"pa.\0"`, `"dB\0\0"` |
| Sample Rate | §3.8.2 | Unknown — needs capture | — | 1024, 2048, 4096 (compliance); up to 65536 (advanced) | | Sample Rate | §3.8.2 | Unknown — needs capture | — | 1024, 2048, 4096 (compliance); up to 65536 (advanced) |
| Record Mode | §3.8.1 | Unknown | — | Single Shot, Continuous, Manual, Histogram, Histogram Combo | | Record Mode | §3.8.1 | Unknown | — | Single Shot, Continuous, Manual, Histogram, Histogram Combo |
| Trigger Sample Width | §3.13.1h | **NOT YET MAPPED** | uint8? | Default=2 samples | | Trigger Sample Width | §3.13.1h | BW→S3 SUB `0x82` write frame, destuffed `[22]`, uint8 | uint8 | Default=2; confirmed 4=`0x04`, 3=`0x03`. **BW-side write only** — not visible in S3 compliance reads. Mode-gated: only sent in Compliance/Single-Shot/Fixed mode. |
| Auto Window | §3.13.1b | **NOT YET MAPPED** | uint8? | 19 seconds | | Auto Window | §3.13.1b | **Mode-gated — NOT YET MAPPED** | uint8? | 19 seconds; only active when Record Stop Mode = Auto. Capture in Fixed mode produced no wire change. |
| Auxiliary Trigger | §3.13.1d | **NOT YET MAPPED** | bool | Enabled/Disabled | | Auxiliary Trigger | §3.13.1d | **NOT YET MAPPED** | bool | Enabled/Disabled |
| Password | §3.13.1c | Unknown | — | 4-key sequence | | Password | §3.13.1c | Unknown | — | 4-key sequence |
| Serial Connection | §3.9.11 | Unknown | — | Direct / Via Modem | | Serial Connection | §3.9.11 | Unknown | — | Direct / Via Modem |
@@ -917,9 +934,9 @@ Every byte on the wire is written verbatim — no modification, no record header
The earlier stripping behavior applied to a previous logger version. v0.5.0 is confirmed lossless with respect to wire bytes. The earlier stripping behavior applied to a previous logger version. v0.5.0 is confirmed lossless with respect to wire bytes.
**Confirmed wire framing:** **Confirmed wire framing:**
- S3→BW: frame start `0x10 0x02`, frame end `0x10 0x03` - Frame start: `0x10 0x02` (DLE STX)
- BW→S3: frame start `0x02`, frame end `0x03` - Frame end: `0x10 0x03` (DLE ETX)
- Both sides: DLE stuffing `0x10 0x10` = literal `0x10` - DLE stuffing: `0x10 0x10` in payload = literal `0x10`
### C.2 Capture Architecture (Current) ### C.2 Capture Architecture (Current)
@@ -932,7 +949,7 @@ As of 2026-03-02 the capture pipeline produces two flat raw wire dump files per
No record headers, no timestamps, no framing logic applied by the dumper. Files are flat concatenations of `serial.read()` chunks. Frame boundaries must be recovered by the parser. No record headers, no timestamps, no framing logic applied by the dumper. Files are flat concatenations of `serial.read()` chunks. Frame boundaries must be recovered by the parser.
### C.3 Parser Design — Dual-Mode State Machine (`s3_parser.py v0.2.2`) ### C.3 Parser Design — DLE State Machine
A deterministic state machine replaces all prior heuristic scanning. A deterministic state machine replaces all prior heuristic scanning.
@@ -946,8 +963,6 @@ STATE_AFTER_DLE — last byte was 0x10, awaiting qualifier
**Transitions:** **Transitions:**
**S3→BW parser states:**
| Current State | Byte | Action | Next State | | Current State | Byte | Action | Next State |
|---|---|---|---| |---|---|---|---|
| IDLE | `10 02` | Begin new frame | IN_FRAME | | IDLE | `10 02` | Begin new frame | IN_FRAME |
@@ -956,20 +971,7 @@ STATE_AFTER_DLE — last byte was 0x10, awaiting qualifier
| IN_FRAME | `10` | — | AFTER_DLE | | IN_FRAME | `10` | — | AFTER_DLE |
| AFTER_DLE | `10` | Append literal `0x10` | IN_FRAME | | AFTER_DLE | `10` | Append literal `0x10` | IN_FRAME |
| AFTER_DLE | `03` | Frame complete, emit | IDLE | | AFTER_DLE | `03` | Frame complete, emit | IDLE |
| AFTER_DLE | other | Append DLE + byte (recovery) | IN_FRAME | | AFTER_DLE | other | Treat as payload (recovery) | IN_FRAME |
**BW→S3 parser states:**
| Current State | Condition | Action | Next State |
|---|---|---|---|
| IDLE | byte==`41` AND next==`02` | Begin new frame (consume both) | IN_FRAME |
| IDLE | any | Discard | IDLE |
| IN_FRAME | byte==`03` AND (next two==`41 02` OR at EOF) | Frame complete, emit | IDLE |
| IN_FRAME | byte==`03` (no lookahead match) | Append `03` to payload | IN_FRAME |
| IN_FRAME | byte==`10` | — | AFTER_DLE |
| IN_FRAME | other | Append to payload | IN_FRAME |
| AFTER_DLE | byte==`10` | Append literal `10` | IN_FRAME |
| AFTER_DLE | other | Append DLE + byte (recovery) | IN_FRAME |
**Properties:** **Properties:**
- Does not scan globally for `10 02` - Does not scan globally for `10 02`
@@ -980,10 +982,9 @@ STATE_AFTER_DLE — last byte was 0x10, awaiting qualifier
### C.4 Observed Traffic (Validation Captures) ### C.4 Observed Traffic (Validation Captures)
**`raw_bw.bin`** (Blastware → S3): **`raw_bw.bin`** (Blastware → S3):
- 98 complete frames via `41 02` start + `03 41 02` structural boundary detection - 7 complete frames via state machine
- 91/98 small frames validate SUM8 checksum; 7 large config/write frames fail all known checksum algorithms - Mostly small command/control frames, several zero-length payloads
- `41 02` confirmed as two-byte start signature; bare `02` alone is insufficient - Bare `0x02` used as STX (asymmetric — BW does not use DLE STX)
- Bare `03` ETX confirmed; in-payload `03` bytes correctly preserved via lookahead rule
- Contains project metadata strings: `"Standard Recording Setup.set"`, `"Claude test2"`, `"Location #1 - Brians House"` - Contains project metadata strings: `"Standard Recording Setup.set"`, `"Claude test2"`, `"Location #1 - Brians House"`
**`raw_s3.bin`** (S3 → Blastware): **`raw_s3.bin`** (S3 → Blastware):
@@ -997,10 +998,7 @@ STATE_AFTER_DLE — last byte was 0x10, awaiting qualifier
1. **Global byte counting ≠ frame counting.** `0x10 0x02` appears inside payloads. Only state machine transitions produce valid frame boundaries. 1. **Global byte counting ≠ frame counting.** `0x10 0x02` appears inside payloads. Only state machine transitions produce valid frame boundaries.
2. **STX count ≠ frame count.** Only STX→ETX pairs within proper state transitions count. 2. **STX count ≠ frame count.** Only STX→ETX pairs within proper state transitions count.
3. **EOF mid-frame is normal.** Capture termination during active traffic produces an incomplete trailing frame. Not an error. 3. **EOF mid-frame is normal.** Capture termination during active traffic produces an incomplete trailing frame. Not an error.
4. **Start marker must be the full signature.** In BW mode, `41 02` is the start marker — not bare `02`. Bare `02` appears in payload data and would cause phantom frames. 4. **Layer separation.** The parser extracts frames only. Decoding block IDs, validating checksums, and interpreting semantics are responsibilities of a separate protocol decoder layer above it.
5. **ETX lookahead prevents false termination.** In BW mode, `03` is only a frame terminator when followed by `41 02` or at EOF. In-payload `03` bytes are common in large config frames.
6. **Framing is structural. Checksum is semantic.** Frame boundaries are determined by grammar patterns — never by checksum validation. Checksum belongs to the protocol decoder layer, not the framing layer.
7. **Layer separation.** The parser extracts frames only. Decoding block IDs, validating checksums, and interpreting semantics are responsibilities of a separate protocol decoder layer above it.
### C.6 Parser Layer Architecture ### C.6 Parser Layer Architecture

337
parsers/frame_db.py Normal file
View File

@@ -0,0 +1,337 @@
#!/usr/bin/env python3
"""
frame_db.py — SQLite frame database for Instantel protocol captures.
Schema:
captures — one row per ingested capture pair (deduped by SHA256)
frames — one row per parsed frame
byte_values — one row per (frame, offset, value) for fast indexed queries
Usage:
db = FrameDB() # opens default DB at ~/.seismo_lab/frames.db
db = FrameDB(path) # custom path
cap_id = db.ingest(sessions, s3_path, bw_path)
rows = db.query_frames(sub=0xF7, direction="S3")
rows = db.query_by_byte(offset=85, value=0x0A)
"""
from __future__ import annotations
import hashlib
import os
import sqlite3
import struct
from pathlib import Path
from typing import Optional
# ─────────────────────────────────────────────────────────────────────────────
# DB location
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_DB_DIR = Path.home() / ".seismo_lab"
DEFAULT_DB_PATH = DEFAULT_DB_DIR / "frames.db"
# ─────────────────────────────────────────────────────────────────────────────
# Schema
# ─────────────────────────────────────────────────────────────────────────────
_DDL = """
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS captures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- ISO-8601 ingest time
s3_path TEXT,
bw_path TEXT,
capture_hash TEXT NOT NULL UNIQUE, -- SHA256 of s3_blob+bw_blob
notes TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS frames (
id INTEGER PRIMARY KEY AUTOINCREMENT,
capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE,
session_idx INTEGER NOT NULL,
direction TEXT NOT NULL, -- 'BW' or 'S3'
sub INTEGER, -- NULL if malformed
page_key INTEGER,
sub_name TEXT,
payload BLOB NOT NULL,
payload_len INTEGER NOT NULL,
checksum_ok INTEGER -- 1/0/NULL
);
CREATE INDEX IF NOT EXISTS idx_frames_capture ON frames(capture_id);
CREATE INDEX IF NOT EXISTS idx_frames_sub ON frames(sub);
CREATE INDEX IF NOT EXISTS idx_frames_page_key ON frames(page_key);
CREATE INDEX IF NOT EXISTS idx_frames_dir ON frames(direction);
CREATE TABLE IF NOT EXISTS byte_values (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER NOT NULL REFERENCES frames(id) ON DELETE CASCADE,
offset INTEGER NOT NULL,
value INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bv_frame ON byte_values(frame_id);
CREATE INDEX IF NOT EXISTS idx_bv_offset ON byte_values(offset);
CREATE INDEX IF NOT EXISTS idx_bv_value ON byte_values(value);
CREATE INDEX IF NOT EXISTS idx_bv_off_val ON byte_values(offset, value);
"""
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _sha256_blobs(s3_blob: bytes, bw_blob: bytes) -> str:
h = hashlib.sha256()
h.update(s3_blob)
h.update(bw_blob)
return h.hexdigest()
def _interp_bytes(data: bytes, offset: int) -> dict:
"""
Return multi-interpretation dict for 14 bytes starting at offset.
Used in the GUI's byte interpretation panel.
"""
result: dict = {}
remaining = len(data) - offset
if remaining <= 0:
return result
b1 = data[offset]
result["uint8"] = b1
result["int8"] = b1 if b1 < 128 else b1 - 256
if remaining >= 2:
u16be = struct.unpack_from(">H", data, offset)[0]
u16le = struct.unpack_from("<H", data, offset)[0]
result["uint16_be"] = u16be
result["uint16_le"] = u16le
if remaining >= 4:
f32be = struct.unpack_from(">f", data, offset)[0]
f32le = struct.unpack_from("<f", data, offset)[0]
u32be = struct.unpack_from(">I", data, offset)[0]
u32le = struct.unpack_from("<I", data, offset)[0]
result["float32_be"] = round(f32be, 6)
result["float32_le"] = round(f32le, 6)
result["uint32_be"] = u32be
result["uint32_le"] = u32le
return result
# ─────────────────────────────────────────────────────────────────────────────
# FrameDB class
# ─────────────────────────────────────────────────────────────────────────────
class FrameDB:
def __init__(self, path: Optional[Path] = None) -> None:
if path is None:
path = DEFAULT_DB_PATH
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._con = sqlite3.connect(str(path), check_same_thread=False)
self._con.row_factory = sqlite3.Row
self._init_schema()
def _init_schema(self) -> None:
self._con.executescript(_DDL)
self._con.commit()
def close(self) -> None:
self._con.close()
# ── Ingest ────────────────────────────────────────────────────────────
def ingest(
self,
sessions: list, # list[Session] from s3_analyzer
s3_path: Optional[Path],
bw_path: Optional[Path],
notes: str = "",
) -> Optional[int]:
"""
Ingest a list of sessions into the DB.
Returns capture_id, or None if already ingested (duplicate hash).
"""
import datetime
s3_blob = s3_path.read_bytes() if s3_path and s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path and bw_path.exists() else b""
cap_hash = _sha256_blobs(s3_blob, bw_blob)
# Dedup check
row = self._con.execute(
"SELECT id FROM captures WHERE capture_hash=?", (cap_hash,)
).fetchone()
if row:
return None # already in DB
ts = datetime.datetime.now().isoformat(timespec="seconds")
cur = self._con.execute(
"INSERT INTO captures (timestamp, s3_path, bw_path, capture_hash, notes) "
"VALUES (?, ?, ?, ?, ?)",
(ts, str(s3_path) if s3_path else None,
str(bw_path) if bw_path else None,
cap_hash, notes)
)
cap_id = cur.lastrowid
for sess in sessions:
for af in sess.all_frames:
frame_id = self._insert_frame(cap_id, af)
self._insert_byte_values(frame_id, af.frame.payload)
self._con.commit()
return cap_id
def _insert_frame(self, cap_id: int, af) -> int:
"""Insert one AnnotatedFrame; return its rowid."""
sub = af.header.sub if af.header else None
page_key = af.header.page_key if af.header else None
chk_ok = None
if af.frame.checksum_valid is True:
chk_ok = 1
elif af.frame.checksum_valid is False:
chk_ok = 0
cur = self._con.execute(
"INSERT INTO frames "
"(capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(cap_id, af.session_idx, af.source,
sub, page_key, af.sub_name,
af.frame.payload, len(af.frame.payload), chk_ok)
)
return cur.lastrowid
def _insert_byte_values(self, frame_id: int, payload: bytes) -> None:
"""Insert one row per byte in payload into byte_values."""
rows = [(frame_id, i, b) for i, b in enumerate(payload)]
self._con.executemany(
"INSERT INTO byte_values (frame_id, offset, value) VALUES (?, ?, ?)",
rows
)
# ── Queries ───────────────────────────────────────────────────────────
def list_captures(self) -> list[sqlite3.Row]:
return self._con.execute(
"SELECT id, timestamp, s3_path, bw_path, notes, "
" (SELECT COUNT(*) FROM frames WHERE capture_id=captures.id) AS frame_count "
"FROM captures ORDER BY id DESC"
).fetchall()
def query_frames(
self,
capture_id: Optional[int] = None,
direction: Optional[str] = None, # "BW" or "S3"
sub: Optional[int] = None,
page_key: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Query frames table with optional filters.
Returns rows with: id, capture_id, session_idx, direction, sub, page_key,
sub_name, payload, payload_len, checksum_ok
"""
clauses = []
params = []
if capture_id is not None:
clauses.append("capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("direction=?"); params.append(direction)
if sub is not None:
clauses.append("sub=?"); params.append(sub)
if page_key is not None:
clauses.append("page_key=?"); params.append(page_key)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
sql = f"SELECT * FROM frames {where} ORDER BY id LIMIT ?"
params.append(limit)
return self._con.execute(sql, params).fetchall()
def query_by_byte(
self,
offset: int,
value: Optional[int] = None,
capture_id: Optional[int] = None,
direction: Optional[str] = None,
sub: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Return frames that have a specific byte at a specific offset.
Joins byte_values -> frames for indexed lookup.
"""
clauses = ["bv.offset=?"]
params = [offset]
if value is not None:
clauses.append("bv.value=?"); params.append(value)
if capture_id is not None:
clauses.append("f.capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("f.direction=?"); params.append(direction)
if sub is not None:
clauses.append("f.sub=?"); params.append(sub)
where = "WHERE " + " AND ".join(clauses)
sql = (
f"SELECT f.*, bv.offset AS q_offset, bv.value AS q_value "
f"FROM byte_values bv "
f"JOIN frames f ON f.id=bv.frame_id "
f"{where} "
f"ORDER BY f.id LIMIT ?"
)
params.append(limit)
return self._con.execute(sql, params).fetchall()
def get_frame_payload(self, frame_id: int) -> Optional[bytes]:
row = self._con.execute(
"SELECT payload FROM frames WHERE id=?", (frame_id,)
).fetchone()
return bytes(row["payload"]) if row else None
def get_distinct_subs(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE capture_id=? AND sub IS NOT NULL ORDER BY sub",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE sub IS NOT NULL ORDER BY sub"
).fetchall()
return [r[0] for r in rows]
def get_distinct_offsets(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT bv.offset FROM byte_values bv "
"JOIN frames f ON f.id=bv.frame_id WHERE f.capture_id=? ORDER BY bv.offset",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT offset FROM byte_values ORDER BY offset"
).fetchall()
return [r[0] for r in rows]
def interpret_offset(self, payload: bytes, offset: int) -> dict:
"""Return multi-format interpretation of bytes starting at offset."""
return _interp_bytes(payload, offset)
def get_stats(self) -> dict:
captures = self._con.execute("SELECT COUNT(*) FROM captures").fetchone()[0]
frames = self._con.execute("SELECT COUNT(*) FROM frames").fetchone()[0]
bv_rows = self._con.execute("SELECT COUNT(*) FROM byte_values").fetchone()[0]
return {"captures": captures, "frames": frames, "byte_value_rows": bv_rows}

940
parsers/gui_analyzer.py Normal file
View File

@@ -0,0 +1,940 @@
#!/usr/bin/env python3
"""
gui_analyzer.py — Tkinter GUI for s3_analyzer.
Layout:
┌─────────────────────────────────────────────────────────┐
│ [S3 file: ___________ Browse] [BW file: ___ Browse] │
│ [Analyze] [Live mode toggle] Status: Idle │
├──────────────────┬──────────────────────────────────────┤
│ Session list │ Detail panel (tabs) │
│ ─ Session 0 │ Inventory | Hex Dump | Diff │
│ └ POLL (BW) │ │
│ └ POLL_RESP │ (content of selected tab) │
│ ─ Session 1 │ │
│ └ ... │ │
└──────────────────┴──────────────────────────────────────┘
│ Status bar │
└─────────────────────────────────────────────────────────┘
"""
from __future__ import annotations
import queue
import sys
import threading
import time
import tkinter as tk
from pathlib import Path
from tkinter import filedialog, font, messagebox, ttk
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent))
from s3_analyzer import ( # noqa: E402
AnnotatedFrame,
FrameDiff,
Session,
annotate_frames,
diff_sessions,
format_hex_dump,
parse_bw,
parse_s3,
render_session_report,
split_into_sessions,
write_claude_export,
)
from frame_db import FrameDB, DEFAULT_DB_PATH # noqa: E402
# ──────────────────────────────────────────────────────────────────────────────
# Colour palette (dark-ish terminal feel)
# ──────────────────────────────────────────────────────────────────────────────
BG = "#1e1e1e"
BG2 = "#252526"
BG3 = "#2d2d30"
FG = "#d4d4d4"
FG_DIM = "#6a6a6a"
ACCENT = "#569cd6"
ACCENT2 = "#4ec9b0"
RED = "#f44747"
YELLOW = "#dcdcaa"
GREEN = "#4caf50"
ORANGE = "#ce9178"
COL_BW = "#9cdcfe" # BW frames
COL_S3 = "#4ec9b0" # S3 frames
COL_DIFF = "#f44747" # Changed bytes
COL_KNOW = "#4caf50" # Known-field annotations
COL_HEAD = "#569cd6" # Section headers
MONO = ("Consolas", 9)
MONO_SM = ("Consolas", 8)
# ──────────────────────────────────────────────────────────────────────────────
# State container
# ──────────────────────────────────────────────────────────────────────────────
class AnalyzerState:
def __init__(self) -> None:
self.sessions: list[Session] = []
self.diffs: list[Optional[list[FrameDiff]]] = [] # diffs[i] = diff of session i vs i-1
self.s3_path: Optional[Path] = None
self.bw_path: Optional[Path] = None
self.last_capture_id: Optional[int] = None
# ──────────────────────────────────────────────────────────────────────────────
# Main GUI
# ──────────────────────────────────────────────────────────────────────────────
class AnalyzerGUI(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title("S3 Protocol Analyzer")
self.configure(bg=BG)
self.minsize(1050, 600)
self.state = AnalyzerState()
self._live_thread: Optional[threading.Thread] = None
self._live_stop = threading.Event()
self._live_q: queue.Queue[str] = queue.Queue()
self._db = FrameDB()
self._build_widgets()
self._poll_live_queue()
# ── widget construction ────────────────────────────────────────────────
def _build_widgets(self) -> None:
self._build_toolbar()
self._build_panes()
self._build_statusbar()
def _build_toolbar(self) -> None:
bar = tk.Frame(self, bg=BG2, pady=4)
bar.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 5, "pady": 2}
# S3 file
tk.Label(bar, text="S3 raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.s3_var = tk.StringVar()
tk.Entry(bar, textvariable=self.s3_var, width=28, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat",
activebackground=ACCENT, cursor="hand2",
command=lambda: self._browse_file(self.s3_var, "raw_s3.bin")
).pack(side=tk.LEFT, **pad)
tk.Label(bar, text=" BW raw:", bg=BG2, fg=FG, font=MONO).pack(side=tk.LEFT, **pad)
self.bw_var = tk.StringVar()
tk.Entry(bar, textvariable=self.bw_var, width=28, bg=BG3, fg=FG,
insertbackground=FG, relief="flat", font=MONO).pack(side=tk.LEFT, **pad)
tk.Button(bar, text="Browse", bg=BG3, fg=FG, relief="flat",
activebackground=ACCENT, cursor="hand2",
command=lambda: self._browse_file(self.bw_var, "raw_bw.bin")
).pack(side=tk.LEFT, **pad)
# Buttons
tk.Frame(bar, bg=BG2, width=10).pack(side=tk.LEFT)
self.analyze_btn = tk.Button(bar, text="Analyze", bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2",
font=("Consolas", 9, "bold"),
command=self._run_analyze)
self.analyze_btn.pack(side=tk.LEFT, **pad)
self.live_btn = tk.Button(bar, text="Live: OFF", bg=BG3, fg=FG,
relief="flat", padx=10, cursor="hand2",
font=MONO, command=self._toggle_live)
self.live_btn.pack(side=tk.LEFT, **pad)
self.export_btn = tk.Button(bar, text="Export for Claude", bg=ORANGE, fg="#000000",
relief="flat", padx=10, cursor="hand2",
font=("Consolas", 9, "bold"),
command=self._run_export, state="disabled")
self.export_btn.pack(side=tk.LEFT, **pad)
self.status_var = tk.StringVar(value="Idle")
tk.Label(bar, textvariable=self.status_var, bg=BG2, fg=FG_DIM,
font=MONO, anchor="w").pack(side=tk.LEFT, padx=10)
def _build_panes(self) -> None:
pane = tk.PanedWindow(self, orient=tk.HORIZONTAL, bg=BG,
sashwidth=4, sashrelief="flat")
pane.pack(fill=tk.BOTH, expand=True, padx=0, pady=0)
# ── Left: session/frame tree ──────────────────────────────────────
left = tk.Frame(pane, bg=BG2, width=260)
pane.add(left, minsize=200)
tk.Label(left, text="Sessions", bg=BG2, fg=ACCENT,
font=("Consolas", 9, "bold"), anchor="w", padx=6).pack(fill=tk.X)
tree_frame = tk.Frame(left, bg=BG2)
tree_frame.pack(fill=tk.BOTH, expand=True)
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview",
background=BG2, foreground=FG, fieldbackground=BG2,
font=MONO_SM, rowheight=18, borderwidth=0)
style.configure("Treeview.Heading",
background=BG3, foreground=ACCENT, font=MONO_SM)
style.map("Treeview", background=[("selected", BG3)],
foreground=[("selected", "#ffffff")])
self.tree = ttk.Treeview(tree_frame, columns=("info",), show="tree headings",
selectmode="browse")
self.tree.heading("#0", text="Frame")
self.tree.heading("info", text="Info")
self.tree.column("#0", width=160, stretch=True)
self.tree.column("info", width=80, stretch=False)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.tag_configure("session", foreground=ACCENT, font=("Consolas", 9, "bold"))
self.tree.tag_configure("bw_frame", foreground=COL_BW)
self.tree.tag_configure("s3_frame", foreground=COL_S3)
self.tree.tag_configure("bad_chk", foreground=RED)
self.tree.tag_configure("malformed", foreground=RED)
self.tree.bind("<<TreeviewSelect>>", self._on_tree_select)
# ── Right: detail notebook ────────────────────────────────────────
right = tk.Frame(pane, bg=BG)
pane.add(right, minsize=600)
style.configure("TNotebook", background=BG2, borderwidth=0)
style.configure("TNotebook.Tab", background=BG3, foreground=FG,
font=MONO, padding=[8, 2])
style.map("TNotebook.Tab", background=[("selected", BG)],
foreground=[("selected", ACCENT)])
self.nb = ttk.Notebook(right)
self.nb.pack(fill=tk.BOTH, expand=True)
# Tab: Inventory
self.inv_text = self._make_text_tab("Inventory")
# Tab: Hex Dump
self.hex_text = self._make_text_tab("Hex Dump")
# Tab: Diff
self.diff_text = self._make_text_tab("Diff")
# Tab: Full Report (raw text)
self.report_text = self._make_text_tab("Full Report")
# Tab: Query (DB)
self._build_query_tab()
# Tag colours for rich text in all tabs
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
w.tag_configure("head", foreground=COL_HEAD, font=("Consolas", 9, "bold"))
w.tag_configure("bw", foreground=COL_BW)
w.tag_configure("s3", foreground=COL_S3)
w.tag_configure("changed", foreground=COL_DIFF)
w.tag_configure("known", foreground=COL_KNOW)
w.tag_configure("dim", foreground=FG_DIM)
w.tag_configure("normal", foreground=FG)
w.tag_configure("warn", foreground=YELLOW)
w.tag_configure("addr", foreground=ORANGE)
def _make_text_tab(self, title: str) -> tk.Text:
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text=title)
w = tk.Text(frame, bg=BG, fg=FG, font=MONO, state="disabled",
relief="flat", wrap="none", insertbackground=FG,
selectbackground=BG3, selectforeground="#ffffff")
vsb = ttk.Scrollbar(frame, orient="vertical", command=w.yview)
hsb = ttk.Scrollbar(frame, orient="horizontal", command=w.xview)
w.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
hsb.pack(side=tk.BOTTOM, fill=tk.X)
w.pack(fill=tk.BOTH, expand=True)
return w
def _build_query_tab(self) -> None:
"""Build the Query tab: filter controls + results table + interpretation panel."""
frame = tk.Frame(self.nb, bg=BG)
self.nb.add(frame, text="Query DB")
# ── Filter row ────────────────────────────────────────────────────
filt = tk.Frame(frame, bg=BG2, pady=4)
filt.pack(side=tk.TOP, fill=tk.X)
pad = {"padx": 4, "pady": 2}
# Capture filter
tk.Label(filt, text="Capture:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=0, sticky="e", **pad)
self._q_capture_var = tk.StringVar(value="All")
self._q_capture_cb = ttk.Combobox(filt, textvariable=self._q_capture_var,
width=18, font=MONO_SM, state="readonly")
self._q_capture_cb.grid(row=0, column=1, sticky="w", **pad)
# Direction filter
tk.Label(filt, text="Dir:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=2, sticky="e", **pad)
self._q_dir_var = tk.StringVar(value="All")
self._q_dir_cb = ttk.Combobox(filt, textvariable=self._q_dir_var,
values=["All", "BW", "S3"],
width=6, font=MONO_SM, state="readonly")
self._q_dir_cb.grid(row=0, column=3, sticky="w", **pad)
# SUB filter
tk.Label(filt, text="SUB:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=4, sticky="e", **pad)
self._q_sub_var = tk.StringVar(value="All")
self._q_sub_cb = ttk.Combobox(filt, textvariable=self._q_sub_var,
width=12, font=MONO_SM, state="readonly")
self._q_sub_cb.grid(row=0, column=5, sticky="w", **pad)
# Byte offset filter
tk.Label(filt, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=6, sticky="e", **pad)
self._q_offset_var = tk.StringVar(value="")
tk.Entry(filt, textvariable=self._q_offset_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=7, sticky="w", **pad)
# Value filter
tk.Label(filt, text="Value:", bg=BG2, fg=FG, font=MONO_SM).grid(row=0, column=8, sticky="e", **pad)
self._q_value_var = tk.StringVar(value="")
tk.Entry(filt, textvariable=self._q_value_var, width=8, bg=BG3, fg=FG,
font=MONO_SM, insertbackground=FG, relief="flat").grid(row=0, column=9, sticky="w", **pad)
# Run / Refresh buttons
tk.Button(filt, text="Run Query", bg=ACCENT, fg="#ffffff", relief="flat",
padx=8, cursor="hand2", font=("Consolas", 8, "bold"),
command=self._run_db_query).grid(row=0, column=10, padx=8)
tk.Button(filt, text="Refresh dropdowns", bg=BG3, fg=FG, relief="flat",
padx=6, cursor="hand2", font=MONO_SM,
command=self._refresh_query_dropdowns).grid(row=0, column=11, padx=4)
# DB stats label
self._q_stats_var = tk.StringVar(value="DB: —")
tk.Label(filt, textvariable=self._q_stats_var, bg=BG2, fg=FG_DIM,
font=MONO_SM).grid(row=0, column=12, padx=12, sticky="w")
# ── Results table ─────────────────────────────────────────────────
res_frame = tk.Frame(frame, bg=BG)
res_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
# Results treeview
cols = ("cap", "sess", "dir", "sub", "sub_name", "page", "len", "chk")
self._q_tree = ttk.Treeview(res_frame, columns=cols,
show="headings", selectmode="browse")
col_cfg = [
("cap", "Cap", 40),
("sess", "Sess", 40),
("dir", "Dir", 40),
("sub", "SUB", 50),
("sub_name", "Name", 160),
("page", "Page", 60),
("len", "Len", 50),
("chk", "Chk", 50),
]
for cid, heading, width in col_cfg:
self._q_tree.heading(cid, text=heading, anchor="w")
self._q_tree.column(cid, width=width, stretch=(cid == "sub_name"))
q_vsb = ttk.Scrollbar(res_frame, orient="vertical", command=self._q_tree.yview)
q_hsb = ttk.Scrollbar(res_frame, orient="horizontal", command=self._q_tree.xview)
self._q_tree.configure(yscrollcommand=q_vsb.set, xscrollcommand=q_hsb.set)
q_vsb.pack(side=tk.RIGHT, fill=tk.Y)
q_hsb.pack(side=tk.BOTTOM, fill=tk.X)
self._q_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self._q_tree.tag_configure("bw_row", foreground=COL_BW)
self._q_tree.tag_configure("s3_row", foreground=COL_S3)
self._q_tree.tag_configure("bad_row", foreground=RED)
# ── Interpretation panel (below results) ──────────────────────────
interp_frame = tk.Frame(frame, bg=BG2, height=120)
interp_frame.pack(side=tk.BOTTOM, fill=tk.X)
interp_frame.pack_propagate(False)
tk.Label(interp_frame, text="Byte interpretation (click a row, enter offset):",
bg=BG2, fg=ACCENT, font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
interp_inner = tk.Frame(interp_frame, bg=BG2)
interp_inner.pack(fill=tk.X, padx=6, pady=2)
tk.Label(interp_inner, text="Offset:", bg=BG2, fg=FG, font=MONO_SM).pack(side=tk.LEFT)
self._interp_offset_var = tk.StringVar(value="5")
tk.Entry(interp_inner, textvariable=self._interp_offset_var,
width=6, bg=BG3, fg=FG, font=MONO_SM,
insertbackground=FG, relief="flat").pack(side=tk.LEFT, padx=4)
tk.Button(interp_inner, text="Interpret", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO_SM,
command=self._run_interpret).pack(side=tk.LEFT, padx=4)
self._interp_text = tk.Text(interp_frame, bg=BG2, fg=FG, font=MONO_SM,
height=4, relief="flat", state="disabled",
insertbackground=FG)
self._interp_text.pack(fill=tk.X, padx=6, pady=2)
self._interp_text.tag_configure("label", foreground=FG_DIM)
self._interp_text.tag_configure("value", foreground=YELLOW)
# Store frame rows by tree iid -> db row
self._q_rows: dict[str, object] = {}
self._q_capture_rows: list = [None]
self._q_sub_values: list = [None]
self._q_tree.bind("<<TreeviewSelect>>", self._on_q_select)
# Init dropdowns
self._refresh_query_dropdowns()
def _refresh_query_dropdowns(self) -> None:
"""Reload capture and SUB dropdowns from the DB."""
try:
captures = self._db.list_captures()
cap_labels = ["All"] + [
f"#{r['id']} {r['timestamp'][:16]} ({r['frame_count']} frames)"
for r in captures
]
self._q_capture_cb["values"] = cap_labels
self._q_capture_rows = [None] + [r["id"] for r in captures]
subs = self._db.get_distinct_subs()
sub_labels = ["All"] + [f"0x{s:02X}" for s in subs]
self._q_sub_cb["values"] = sub_labels
self._q_sub_values = [None] + subs
stats = self._db.get_stats()
self._q_stats_var.set(
f"DB: {stats['captures']} captures | {stats['frames']} frames"
)
except Exception as exc:
self._q_stats_var.set(f"DB error: {exc}")
def _parse_hex_or_int(self, s: str) -> Optional[int]:
"""Parse '0x1F', '31', or '' into int or None."""
s = s.strip()
if not s:
return None
try:
return int(s, 0)
except ValueError:
return None
def _run_db_query(self) -> None:
"""Execute query with current filter values and populate results tree."""
# Resolve capture_id
cap_idx = self._q_capture_cb.current()
cap_id = self._q_capture_rows[cap_idx] if cap_idx > 0 else None
# Direction
dir_val = self._q_dir_var.get()
direction = dir_val if dir_val != "All" else None
# SUB
sub_idx = self._q_sub_cb.current()
sub = self._q_sub_values[sub_idx] if sub_idx > 0 else None
# Offset / value
offset = self._parse_hex_or_int(self._q_offset_var.get())
value = self._parse_hex_or_int(self._q_value_var.get())
try:
if offset is not None:
rows = self._db.query_by_byte(
offset=offset, value=value,
capture_id=cap_id, direction=direction, sub=sub
)
else:
rows = self._db.query_frames(
capture_id=cap_id, direction=direction, sub=sub
)
except Exception as exc:
messagebox.showerror("Query error", str(exc))
return
# Populate tree
self._q_tree.delete(*self._q_tree.get_children())
self._q_rows.clear()
for row in rows:
sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else ""
page_hex = f"0x{row['page_key']:04X}" if row["page_key"] is not None else ""
chk_str = {1: "OK", 0: "BAD", None: ""}.get(row["checksum_ok"], "")
tag = "bw_row" if row["direction"] == "BW" else "s3_row"
if row["checksum_ok"] == 0:
tag = "bad_row"
iid = str(row["id"])
self._q_tree.insert("", tk.END, iid=iid, tags=(tag,), values=(
row["capture_id"],
row["session_idx"],
row["direction"],
sub_hex,
row["sub_name"] or "",
page_hex,
row["payload_len"],
chk_str,
))
self._q_rows[iid] = row
self.sb_var.set(f"Query returned {len(rows)} rows")
def _on_q_select(self, _event: tk.Event) -> None:
"""When a DB result row is selected, auto-run interpret at current offset."""
self._run_interpret()
def _run_interpret(self) -> None:
"""Show multi-format byte interpretation for the selected row + offset."""
sel = self._q_tree.selection()
if not sel:
return
iid = sel[0]
row = self._q_rows.get(iid)
if row is None:
return
offset = self._parse_hex_or_int(self._interp_offset_var.get())
if offset is None:
return
payload = bytes(row["payload"])
interp = self._db.interpret_offset(payload, offset)
w = self._interp_text
w.configure(state="normal")
w.delete("1.0", tk.END)
sub_hex = f"0x{row['sub']:02X}" if row["sub"] is not None else "??"
w.insert(tk.END, f"Frame #{row['id']} [{row['direction']}] SUB={sub_hex} "
f"offset={offset} (0x{offset:04X})\n", "label")
label_order = [
("uint8", "uint8 "),
("int8", "int8 "),
("uint16_be", "uint16 BE "),
("uint16_le", "uint16 LE "),
("uint32_be", "uint32 BE "),
("uint32_le", "uint32 LE "),
("float32_be", "float32 BE "),
("float32_le", "float32 LE "),
]
line = ""
for key, label in label_order:
if key in interp:
val = interp[key]
if isinstance(val, float):
val_str = f"{val:.6g}"
else:
val_str = str(val)
if key.startswith("uint") or key.startswith("int"):
val_str += f" (0x{int(val) & 0xFFFFFFFF:X})"
chunk = f"{label}: {val_str}"
line += f" {chunk:<30}"
if len(line) > 80:
w.insert(tk.END, line + "\n", "value")
line = ""
if line:
w.insert(tk.END, line + "\n", "value")
w.configure(state="disabled")
def _build_statusbar(self) -> None:
bar = tk.Frame(self, bg=BG3, height=20)
bar.pack(side=tk.BOTTOM, fill=tk.X)
self.sb_var = tk.StringVar(value="Ready")
tk.Label(bar, textvariable=self.sb_var, bg=BG3, fg=FG_DIM,
font=MONO_SM, anchor="w", padx=6).pack(fill=tk.X)
# ── file picking ───────────────────────────────────────────────────────
def _browse_file(self, var: tk.StringVar, default_name: str) -> None:
path = filedialog.askopenfilename(
title=f"Select {default_name}",
filetypes=[("Binary files", "*.bin"), ("All files", "*.*")],
initialfile=default_name,
)
if path:
var.set(path)
# ── analysis ──────────────────────────────────────────────────────────
def _run_analyze(self) -> None:
s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3_path or not bw_path:
messagebox.showerror("Missing files", "Please select both S3 and BW raw files.")
return
if not s3_path.exists():
messagebox.showerror("File not found", f"S3 file not found:\n{s3_path}")
return
if not bw_path.exists():
messagebox.showerror("File not found", f"BW file not found:\n{bw_path}")
return
self.state.s3_path = s3_path
self.state.bw_path = bw_path
self._do_analyze(s3_path, bw_path)
def _run_export(self) -> None:
if not self.state.sessions:
messagebox.showinfo("Export", "Run Analyze first.")
return
outdir = self.state.s3_path.parent if self.state.s3_path else Path(".")
out_path = write_claude_export(
self.state.sessions,
self.state.diffs,
outdir,
self.state.s3_path,
self.state.bw_path,
)
self.sb_var.set(f"Exported: {out_path.name}")
if messagebox.askyesno(
"Export complete",
f"Saved to:\n{out_path}\n\nOpen the folder?",
):
import subprocess
subprocess.Popen(["explorer", str(out_path.parent)])
def _do_analyze(self, s3_path: Path, bw_path: Path) -> None:
self.status_var.set("Parsing...")
self.update_idletasks()
s3_blob = s3_path.read_bytes()
bw_blob = bw_path.read_bytes()
s3_frames = annotate_frames(parse_s3(s3_blob, trailer_len=0), "S3")
bw_frames = annotate_frames(parse_bw(bw_blob, trailer_len=0, validate_checksum=True), "BW")
sessions = split_into_sessions(bw_frames, s3_frames)
diffs: list[Optional[list[FrameDiff]]] = [None]
for i in range(1, len(sessions)):
diffs.append(diff_sessions(sessions[i - 1], sessions[i]))
self.state.sessions = sessions
self.state.diffs = diffs
n_s3 = sum(len(s.s3_frames) for s in sessions)
n_bw = sum(len(s.bw_frames) for s in sessions)
self.status_var.set(
f"{len(sessions)} sessions | BW: {n_bw} frames S3: {n_s3} frames"
)
self.sb_var.set(f"Loaded: {s3_path.name} + {bw_path.name}")
self.export_btn.configure(state="normal")
self._rebuild_tree()
# Auto-ingest into DB (deduped by SHA256 — fast no-op on re-analyze)
try:
cap_id = self._db.ingest(sessions, s3_path, bw_path)
if cap_id is not None:
self.state.last_capture_id = cap_id
self._refresh_query_dropdowns()
# Pre-select this capture in the Query tab
cap_labels = list(self._q_capture_cb["values"])
# Find label that starts with #<cap_id>
for i, lbl in enumerate(cap_labels):
if lbl.startswith(f"#{cap_id} "):
self._q_capture_cb.current(i)
break
# else: already ingested — no change to dropdown selection
except Exception as exc:
self.sb_var.set(f"DB ingest error: {exc}")
# ── tree building ──────────────────────────────────────────────────────
def _rebuild_tree(self) -> None:
self.tree.delete(*self.tree.get_children())
for sess in self.state.sessions:
is_complete = any(
af.header is not None and af.header.sub == 0x74
for af in sess.bw_frames
)
label = f"Session {sess.index}"
if not is_complete:
label += " [partial]"
n_diff = len(self.state.diffs[sess.index] or [])
diff_info = f"{n_diff} changes" if n_diff > 0 else ""
sess_id = self.tree.insert("", tk.END, text=label,
values=(diff_info,), tags=("session",))
for af in sess.all_frames:
src_tag = "bw_frame" if af.source == "BW" else "s3_frame"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
label_text = f"[{af.source}] {sub_hex} {af.sub_name}"
extra = ""
tags = (src_tag,)
if af.frame.checksum_valid is False:
extra = "BAD CHK"
tags = ("bad_chk",)
elif af.header is None:
tags = ("malformed",)
label_text = f"[{af.source}] MALFORMED"
self.tree.insert(sess_id, tk.END, text=label_text,
values=(extra,), tags=tags,
iid=f"frame_{sess.index}_{af.frame.index}_{af.source}")
# Expand all sessions
for item in self.tree.get_children():
self.tree.item(item, open=True)
# ── tree selection → detail panel ─────────────────────────────────────
def _on_tree_select(self, _event: tk.Event) -> None:
sel = self.tree.selection()
if not sel:
return
iid = sel[0]
# Determine if it's a session node or a frame node
if iid.startswith("frame_"):
# frame_<sessidx>_<frameidx>_<source>
parts = iid.split("_")
sess_idx = int(parts[1])
frame_idx = int(parts[2])
source = parts[3]
self._show_frame_detail(sess_idx, frame_idx, source)
else:
# Session node — show session summary
# Find session index from text
text = self.tree.item(iid, "text")
try:
idx = int(text.split()[1])
self._show_session_detail(idx)
except (IndexError, ValueError):
pass
def _find_frame(self, sess_idx: int, frame_idx: int, source: str) -> Optional[AnnotatedFrame]:
if sess_idx >= len(self.state.sessions):
return None
sess = self.state.sessions[sess_idx]
pool = sess.bw_frames if source == "BW" else sess.s3_frames
for af in pool:
if af.frame.index == frame_idx:
return af
return None
# ── detail renderers ──────────────────────────────────────────────────
def _clear_all_tabs(self) -> None:
for w in (self.inv_text, self.hex_text, self.diff_text, self.report_text):
self._text_clear(w)
def _show_session_detail(self, sess_idx: int) -> None:
if sess_idx >= len(self.state.sessions):
return
sess = self.state.sessions[sess_idx]
diffs = self.state.diffs[sess_idx]
self._clear_all_tabs()
# ── Inventory tab ────────────────────────────────────────────────
w = self.inv_text
self._text_clear(w)
self._tw(w, f"SESSION {sess.index}", "head"); self._tn(w)
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
self._tw(w, f"Frames: {n_bw + n_s3} (BW: {n_bw}, S3: {n_s3})\n", "normal")
if n_bw != n_s3:
self._tw(w, " WARNING: BW/S3 count mismatch\n", "warn")
self._tn(w)
for seq_i, af in enumerate(sess.all_frames):
src_tag = "bw" if af.source == "BW" else "s3"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
page_str = f" (page {af.header.page_key:04X})" if af.header and af.header.page_key != 0 else ""
chk = ""
if af.frame.checksum_valid is False:
chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True:
chk = f" [{af.frame.checksum_type}]"
self._tw(w, f" [{af.source}] #{seq_i:<3} ", src_tag)
self._tw(w, f"SUB={sub_hex} ", "addr")
self._tw(w, f"{af.sub_name:<30}", src_tag)
self._tw(w, f"{page_str} len={len(af.frame.payload)}", "dim")
if chk:
self._tw(w, chk, "warn" if af.frame.checksum_valid is False else "dim")
self._tn(w)
# ── Diff tab ─────────────────────────────────────────────────────
w = self.diff_text
self._text_clear(w)
if diffs is None:
self._tw(w, "(No previous session to diff against)\n", "dim")
elif not diffs:
self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w)
self._tw(w, " No changes detected.\n", "dim")
else:
self._tw(w, f"DIFF vs SESSION {sess_idx - 1}\n", "head"); self._tn(w)
for fd in diffs:
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
self._tw(w, f"\n SUB {fd.sub:02X} ({fd.sub_name}){page_str}:\n", "addr")
for bd in fd.diffs:
before_s = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_s = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{before_s} -> {after_s}", "changed")
if bd.field_name:
self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
# ── Full Report tab ───────────────────────────────────────────────
report_text = render_session_report(sess, diffs, sess_idx - 1 if sess_idx > 0 else None)
w = self.report_text
self._text_clear(w)
self._tw(w, report_text, "normal")
# Switch to Inventory tab
self.nb.select(0)
def _show_frame_detail(self, sess_idx: int, frame_idx: int, source: str) -> None:
af = self._find_frame(sess_idx, frame_idx, source)
if af is None:
return
self._clear_all_tabs()
src_tag = "bw" if source == "BW" else "s3"
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
# ── Inventory tab — single frame summary ─────────────────────────
w = self.inv_text
self._tw(w, f"[{af.source}] Frame #{af.frame.index}\n", src_tag)
self._tw(w, f"Session {sess_idx} | ", "dim")
self._tw(w, f"SUB={sub_hex} {af.sub_name}\n", "addr")
if af.header:
self._tw(w, f" OFFSET: {af.header.page_key:04X} ", "dim")
self._tw(w, f"CMD={af.header.cmd:02X} FLAGS={af.header.flags:02X}\n", "dim")
self._tn(w)
self._tw(w, f"Payload bytes: {len(af.frame.payload)}\n", "dim")
if af.frame.checksum_valid is False:
self._tw(w, " BAD CHECKSUM\n", "warn")
elif af.frame.checksum_valid is True:
self._tw(w, f" Checksum: {af.frame.checksum_type} {af.frame.checksum_hex}\n", "dim")
self._tn(w)
# Protocol header breakdown
p = af.frame.payload
if len(p) >= 5:
self._tw(w, "Header breakdown:\n", "head")
self._tw(w, f" [0] CMD = {p[0]:02x}\n", "dim")
self._tw(w, f" [1] ? = {p[1]:02x}\n", "dim")
self._tw(w, f" [2] SUB = {p[2]:02x} ({af.sub_name})\n", src_tag)
self._tw(w, f" [3] OFFSET_HI = {p[3]:02x}\n", "dim")
self._tw(w, f" [4] OFFSET_LO = {p[4]:02x}\n", "dim")
if len(p) > 5:
self._tw(w, f" [5..] data = {len(p) - 5} bytes\n", "dim")
# ── Hex Dump tab ─────────────────────────────────────────────────
w = self.hex_text
self._tw(w, f"[{af.source}] SUB={sub_hex} {af.sub_name}\n", src_tag)
self._tw(w, f"Payload ({len(af.frame.payload)} bytes):\n", "dim")
self._tn(w)
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
self._tw(w, "\n".join(dump_lines) + "\n", "normal")
# Annotate known field offsets within this frame
diffs_for_sess = self.state.diffs[sess_idx] if sess_idx < len(self.state.diffs) else None
if diffs_for_sess and af.header:
page_key = af.header.page_key
matching = [fd for fd in diffs_for_sess
if fd.sub == af.header.sub and fd.page_key == page_key]
if matching:
self._tn(w)
self._tw(w, "Changed bytes in this frame (vs prev session):\n", "head")
for bd in matching[0].diffs:
before_s = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_s = f"{bd.after:02x}" if bd.after >= 0 else "--"
self._tw(w, f" [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: ", "dim")
self._tw(w, f"{before_s} -> {after_s}", "changed")
if bd.field_name:
self._tw(w, f" [{bd.field_name}]", "known")
self._tn(w)
# Switch to Hex Dump tab for frame selection
self.nb.select(1)
# ── live mode ─────────────────────────────────────────────────────────
def _toggle_live(self) -> None:
if self._live_thread and self._live_thread.is_alive():
self._live_stop.set()
self.live_btn.configure(text="Live: OFF", bg=BG3, fg=FG)
self.status_var.set("Live stopped")
else:
s3_path = Path(self.s3_var.get().strip()) if self.s3_var.get().strip() else None
bw_path = Path(self.bw_var.get().strip()) if self.bw_var.get().strip() else None
if not s3_path or not bw_path:
messagebox.showerror("Missing files", "Select both raw files before starting live mode.")
return
self.state.s3_path = s3_path
self.state.bw_path = bw_path
self._live_stop.clear()
self._live_thread = threading.Thread(
target=self._live_worker, args=(s3_path, bw_path), daemon=True)
self._live_thread.start()
self.live_btn.configure(text="Live: ON", bg=GREEN, fg="#000000")
self.status_var.set("Live mode running...")
def _live_worker(self, s3_path: Path, bw_path: Path) -> None:
s3_buf = bytearray()
bw_buf = bytearray()
s3_pos = bw_pos = 0
while not self._live_stop.is_set():
changed = False
if s3_path.exists():
with s3_path.open("rb") as fh:
fh.seek(s3_pos)
nb = fh.read()
if nb:
s3_buf.extend(nb); s3_pos += len(nb); changed = True
if bw_path.exists():
with bw_path.open("rb") as fh:
fh.seek(bw_pos)
nb = fh.read()
if nb:
bw_buf.extend(nb); bw_pos += len(nb); changed = True
if changed:
self._live_q.put("refresh")
time.sleep(0.1)
def _poll_live_queue(self) -> None:
try:
while True:
msg = self._live_q.get_nowait()
if msg == "refresh" and self.state.s3_path and self.state.bw_path:
self._do_analyze(self.state.s3_path, self.state.bw_path)
except queue.Empty:
pass
finally:
self.after(150, self._poll_live_queue)
# ── text helpers ──────────────────────────────────────────────────────
def _text_clear(self, w: tk.Text) -> None:
w.configure(state="normal")
w.delete("1.0", tk.END)
# leave enabled for further inserts
def _tw(self, w: tk.Text, text: str, tag: str = "normal") -> None:
"""Insert text with a colour tag."""
w.configure(state="normal")
w.insert(tk.END, text, tag)
def _tn(self, w: tk.Text) -> None:
"""Insert newline."""
w.configure(state="normal")
w.insert(tk.END, "\n")
w.configure(state="disabled")
# ──────────────────────────────────────────────────────────────────────────────
# Entry point
# ──────────────────────────────────────────────────────────────────────────────
def main() -> None:
app = AnalyzerGUI()
app.mainloop()
if __name__ == "__main__":
main()

948
parsers/s3_analyzer.py Normal file
View File

@@ -0,0 +1,948 @@
#!/usr/bin/env python3
"""
s3_analyzer.py — Live protocol analysis tool for Instantel MiniMate Plus RS-232.
Reads raw_s3.bin and raw_bw.bin (produced by s3_bridge.py), parses DLE frames,
groups into sessions, auto-diffs consecutive sessions, and annotates known fields.
Usage:
python s3_analyzer.py --s3 raw_s3.bin --bw raw_bw.bin [--live] [--outdir DIR]
"""
from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
# Allow running from any working directory
sys.path.insert(0, str(Path(__file__).parent))
from s3_parser import Frame, parse_bw, parse_s3 # noqa: E402
__version__ = "0.1.0"
# ──────────────────────────────────────────────────────────────────────────────
# Protocol constants
# ──────────────────────────────────────────────────────────────────────────────
# SUB_TABLE: sub_byte → (name, direction, notes)
# direction: "BW→S3", "S3→BW", or "both"
SUB_TABLE: dict[int, tuple[str, str, str]] = {
# BW→S3 read requests
0x5B: ("POLL", "BW→S3", "Keepalive / device discovery"),
0x01: ("FULL_CONFIG_READ", "BW→S3", "~0x98 bytes; firmware, model, serial, channel config"),
0x06: ("CHANNEL_CONFIG_READ", "BW→S3", "0x24 bytes; channel configuration block"),
0x08: ("EVENT_INDEX_READ", "BW→S3", "0x58 bytes; event count and record pointers"),
0x0A: ("WAVEFORM_HEADER_READ", "BW→S3", "0x30 bytes/page; waveform header keyed by timestamp"),
0x0C: ("FULL_WAVEFORM_READ", "BW→S3", "0xD2 bytes/page × 2; project strings, PPV floats"),
0x1C: ("TRIGGER_CONFIG_READ", "BW→S3", "0x2C bytes; trigger settings block"),
0x09: ("UNKNOWN_READ_A", "BW→S3", "0xCA bytes response (F6); purpose unknown"),
0x1A: ("COMPLIANCE_CONFIG_READ", "BW→S3", "Large block (E5); trigger/alarm floats, unit strings"),
0x2E: ("UNKNOWN_READ_B", "BW→S3", "0x1A bytes response (D1); purpose unknown"),
# BW→S3 write commands
0x68: ("EVENT_INDEX_WRITE", "BW→S3", "Mirrors SUB 08 read; event count and timestamps"),
0x69: ("WAVEFORM_DATA_WRITE", "BW→S3", "0xCA bytes; mirrors SUB 09"),
0x71: ("COMPLIANCE_STRINGS_WRITE", "BW→S3", "Compliance config + all project string fields"),
0x72: ("WRITE_CONFIRM_A", "BW→S3", "Short frame; commit step after 0x71"),
0x73: ("WRITE_CONFIRM_B", "BW→S3", "Short frame"),
0x74: ("WRITE_CONFIRM_C", "BW→S3", "Short frame; final session-close confirm"),
0x82: ("TRIGGER_CONFIG_WRITE", "BW→S3", "0x1C bytes; trigger config block; mirrors SUB 1C"),
0x83: ("TRIGGER_WRITE_CONFIRM", "BW→S3", "Short frame; commit step after 0x82"),
# S3→BW responses
0xA4: ("POLL_RESPONSE", "S3→BW", "Response to SUB 5B poll"),
0xFE: ("FULL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 01"),
0xF9: ("CHANNEL_CONFIG_RESPONSE", "S3→BW", "Response to SUB 06"),
0xF7: ("EVENT_INDEX_RESPONSE", "S3→BW", "Response to SUB 08; contains backlight/power-save"),
0xF5: ("WAVEFORM_HEADER_RESPONSE", "S3→BW", "Response to SUB 0A"),
0xF3: ("FULL_WAVEFORM_RESPONSE", "S3→BW", "Response to SUB 0C; project strings, PPV floats"),
0xE3: ("TRIGGER_CONFIG_RESPONSE", "S3→BW", "Response to SUB 1C; contains timestamps"),
0xF6: ("UNKNOWN_RESPONSE_A", "S3→BW", "Response to SUB 09; 0xCA bytes"),
0xE5: ("COMPLIANCE_CONFIG_RESPONSE","S3→BW", "Response to SUB 1A; record time in page 2"),
0xD1: ("UNKNOWN_RESPONSE_B", "S3→BW", "Response to SUB 2E; 0x1A bytes"),
0xEA: ("SERIAL_NUMBER_RESPONSE", "S3→BW", "0x0A bytes; serial number + firmware minor version"),
# Short ack responses to writes (0xFF - write_sub)
0x8E: ("WRITE_CONFIRM_RESPONSE_71", "S3→BW", "Ack for SUB 71 COMPLIANCE_STRINGS_WRITE"),
0x8D: ("WRITE_CONFIRM_RESPONSE_72", "S3→BW", "Ack for SUB 72 WRITE_CONFIRM_A"),
0x8C: ("WRITE_CONFIRM_RESPONSE_73", "S3→BW", "Ack for SUB 73 WRITE_CONFIRM_B"),
0x8B: ("WRITE_CONFIRM_RESPONSE_74", "S3→BW", "Ack for SUB 74 WRITE_CONFIRM_C"),
0x97: ("WRITE_CONFIRM_RESPONSE_68", "S3→BW", "Ack for SUB 68 EVENT_INDEX_WRITE"),
0x96: ("WRITE_CONFIRM_RESPONSE_69", "S3→BW", "Ack for SUB 69 WAVEFORM_DATA_WRITE"),
0x7D: ("WRITE_CONFIRM_RESPONSE_82", "S3→BW", "Ack for SUB 82 TRIGGER_CONFIG_WRITE"),
0x7C: ("WRITE_CONFIRM_RESPONSE_83", "S3→BW", "Ack for SUB 83 TRIGGER_WRITE_CONFIRM"),
}
# SUBs whose data-section bytes 05 are known timestamps (suppress in diffs)
NOISY_SUBS: set[int] = {0xE3, 0xF7, 0xF5}
# E5 page 2 key: the OFFSET_HI:OFFSET_LO that identifies the data page
# E5 page 1 (length probe) has offset 0x0000; page 2 has offset 0x082A
E5_PAGE2_KEY = 0x082A
# FieldEntry: (sub, page_key_or_none, payload_offset, field_name, type_hint, notes)
# payload_offset = offset from start of Frame.payload (not data section, not wire)
# Exception: for SUB 0x82, offset [22] is from full de-stuffed payload[0] per protocol ref.
@dataclass(frozen=True)
class FieldEntry:
sub: int
page_key: Optional[int] # None = any / all pages
payload_offset: int # offset from frame.payload[0]
name: str
type_hint: str
notes: str
FIELD_MAP: list[FieldEntry] = [
# F7 (EVENT_INDEX_RESPONSE) — data section starts at payload[5]
# Protocol ref: backlight at data+0x4B = payload[5+0x4B] = payload[80]
FieldEntry(0xF7, None, 5 + 0x4B, "backlight_on_time", "uint8", "seconds; 0=off"),
FieldEntry(0xF7, None, 5 + 0x53, "power_save_timeout", "uint8", "minutes; 0=disabled"),
FieldEntry(0xF7, None, 5 + 0x54, "monitoring_lcd_cycle", "uint16 BE","65500=disabled"),
# E5 page 2 (COMPLIANCE_CONFIG_RESPONSE) — record time at data+0x28
FieldEntry(0xE5, E5_PAGE2_KEY, 5 + 0x28, "record_time", "float32 BE", "seconds; 7s=40E00000, 13s=41500000"),
# SUB 0x82 (TRIGGER_CONFIG_WRITE) — BW→S3 write
# Protocol ref offset [22] is from the de-stuffed payload[0], confirmed from raw_bw.bin
FieldEntry(0x82, None, 22, "trigger_sample_width", "uint8", "samples; mode-gated, BW-side write only"),
]
# ──────────────────────────────────────────────────────────────────────────────
# Data structures
# ──────────────────────────────────────────────────────────────────────────────
@dataclass
class FrameHeader:
cmd: int
sub: int
offset_hi: int
offset_lo: int
flags: int
@property
def page_key(self) -> int:
return (self.offset_hi << 8) | self.offset_lo
@dataclass
class AnnotatedFrame:
frame: Frame
source: str # "BW" or "S3"
header: Optional[FrameHeader] # None if payload < 7 bytes (malformed/short)
sub_name: str
session_idx: int = -1
@dataclass
class Session:
index: int
bw_frames: list[AnnotatedFrame]
s3_frames: list[AnnotatedFrame]
@property
def all_frames(self) -> list[AnnotatedFrame]:
"""Interleave BW/S3 in synchronous protocol order: BW[0], S3[0], BW[1], S3[1]..."""
result: list[AnnotatedFrame] = []
for i in range(max(len(self.bw_frames), len(self.s3_frames))):
if i < len(self.bw_frames):
result.append(self.bw_frames[i])
if i < len(self.s3_frames):
result.append(self.s3_frames[i])
return result
@dataclass
class ByteDiff:
payload_offset: int
before: int
after: int
field_name: Optional[str]
@dataclass
class FrameDiff:
sub: int
page_key: int
sub_name: str
diffs: list[ByteDiff]
# ──────────────────────────────────────────────────────────────────────────────
# Parsing helpers
# ──────────────────────────────────────────────────────────────────────────────
def extract_header(payload: bytes) -> Optional[FrameHeader]:
"""
Extract protocol header from de-stuffed payload.
After de-stuffing, the actual observed layout is 5 bytes:
[0] CMD -- 0x10 for BW requests, 0x00 for S3 responses
[1] ? -- 0x00 for BW, 0x10 for S3 (DLE/ADDR byte that survives de-stuffing)
[2] SUB -- the actual command/response identifier
[3] OFFSET_HI
[4] OFFSET_LO
Data section begins at payload[5].
Note: The protocol reference describes a 7-byte header with CMD/DLE/ADDR/FLAGS/SUB/...,
but DLE+ADDR (both 0x10 on wire) are de-stuffed into single bytes by parse_bw/parse_s3,
collapsing the observable header to 5 bytes.
"""
if len(payload) < 5:
return None
return FrameHeader(
cmd=payload[0],
sub=payload[2],
offset_hi=payload[3],
offset_lo=payload[4],
flags=payload[1],
)
def annotate_frame(frame: Frame, source: str) -> AnnotatedFrame:
header = extract_header(frame.payload)
if header is not None:
entry = SUB_TABLE.get(header.sub)
sub_name = entry[0] if entry else f"UNKNOWN_{header.sub:02X}"
else:
sub_name = "MALFORMED"
return AnnotatedFrame(frame=frame, source=source, header=header, sub_name=sub_name)
def annotate_frames(frames: list[Frame], source: str) -> list[AnnotatedFrame]:
return [annotate_frame(f, source) for f in frames]
def load_and_annotate(s3_path: Path, bw_path: Path) -> tuple[list[AnnotatedFrame], list[AnnotatedFrame]]:
"""Parse both raw files and return annotated frame lists."""
s3_blob = s3_path.read_bytes() if s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path.exists() else b""
s3_frames = parse_s3(s3_blob, trailer_len=0)
bw_frames = parse_bw(bw_blob, trailer_len=0, validate_checksum=True)
return annotate_frames(s3_frames, "S3"), annotate_frames(bw_frames, "BW")
# ──────────────────────────────────────────────────────────────────────────────
# Session detection
# ──────────────────────────────────────────────────────────────────────────────
# BW SUB that marks the end of a compliance write session
SESSION_CLOSE_SUB = 0x74
def split_into_sessions(
bw_annotated: list[AnnotatedFrame],
s3_annotated: list[AnnotatedFrame],
) -> list[Session]:
"""
Split frames into sessions. A session ends on BW SUB 0x74 (WRITE_CONFIRM_C).
New session starts at the stream beginning and after each 0x74.
The protocol is synchronous: BW[i] request → S3[i] response. S3 frame i
belongs to the same session as BW frame i.
"""
if not bw_annotated and not s3_annotated:
return []
sessions: list[Session] = []
session_idx = 0
bw_start = 0
# Track where we are in S3 frames — they mirror BW frame count per session
s3_cursor = 0
i = 0
while i < len(bw_annotated):
frame = bw_annotated[i]
i += 1
is_close = (
frame.header is not None and frame.header.sub == SESSION_CLOSE_SUB
)
if is_close:
bw_slice = bw_annotated[bw_start:i]
# S3 frames in this session match BW frame count (synchronous protocol)
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
s3_cursor += n_s3
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
session_idx += 1
bw_start = i
# Remaining frames (in-progress / no closing 0x74 yet)
if bw_start < len(bw_annotated) or s3_cursor < len(s3_annotated):
bw_slice = bw_annotated[bw_start:]
n_s3 = len(bw_slice)
s3_slice = s3_annotated[s3_cursor : s3_cursor + n_s3]
# also grab any extra S3 frames beyond expected pairing
if s3_cursor + n_s3 < len(s3_annotated):
s3_slice = s3_annotated[s3_cursor:]
if bw_slice or s3_slice:
sess = Session(index=session_idx, bw_frames=bw_slice, s3_frames=s3_slice)
for f in sess.all_frames:
f.session_idx = session_idx
sessions.append(sess)
return sessions
# ──────────────────────────────────────────────────────────────────────────────
# Diff engine
# ──────────────────────────────────────────────────────────────────────────────
def _mask_noisy(sub: int, data: bytes) -> bytearray:
"""
Zero out known-noisy byte ranges before diffing.
For NOISY_SUBS: mask bytes 05 of the data section (timestamps).
"""
buf = bytearray(data)
if sub in NOISY_SUBS and len(buf) >= 6:
for k in range(6):
buf[k] = 0x00
return buf
HEADER_LEN = 5 # Observed de-stuffed header size: CMD + ? + SUB + OFFSET_HI + OFFSET_LO
def _get_data_section(af: AnnotatedFrame) -> bytes:
"""
Return the data section of the frame (after the 5-byte protocol header).
For S3 frames, payload still contains a trailing SUM8 byte — exclude it.
For BW frames, parse_bw with validate_checksum=True already stripped it.
"""
payload = af.frame.payload
if len(payload) < HEADER_LEN:
return b""
data = payload[HEADER_LEN:]
if af.source == "S3" and len(data) >= 1:
# SUM8 is still present at end of S3 frame payload
data = data[:-1]
return data
def lookup_field_name(sub: int, page_key: int, payload_offset: int) -> Optional[str]:
"""Return field name if the given payload offset matches a known field, else None."""
for entry in FIELD_MAP:
if entry.sub != sub:
continue
if entry.page_key is not None and entry.page_key != page_key:
continue
if entry.payload_offset == payload_offset:
return entry.name
return None
def diff_sessions(sess_a: Session, sess_b: Session) -> list[FrameDiff]:
"""
Compare two sessions frame-by-frame, matched by (sub, page_key).
Returns a list of FrameDiff for SUBs where bytes changed.
"""
# Build lookup: (sub, page_key) → AnnotatedFrame for each session
def index_session(sess: Session) -> dict[tuple[int, int], AnnotatedFrame]:
idx: dict[tuple[int, int], AnnotatedFrame] = {}
for af in sess.all_frames:
if af.header is None:
continue
key = (af.header.sub, af.header.page_key)
# Keep first occurrence per key (or we could keep all — for now, first)
if key not in idx:
idx[key] = af
return idx
idx_a = index_session(sess_a)
idx_b = index_session(sess_b)
results: list[FrameDiff] = []
# Only compare SUBs present in both sessions
common_keys = set(idx_a.keys()) & set(idx_b.keys())
for key in sorted(common_keys):
sub, page_key = key
af_a = idx_a[key]
af_b = idx_b[key]
data_a = _mask_noisy(sub, _get_data_section(af_a))
data_b = _mask_noisy(sub, _get_data_section(af_b))
if data_a == data_b:
continue
# Compare byte by byte up to the shorter length
diffs: list[ByteDiff] = []
max_len = max(len(data_a), len(data_b))
for offset in range(max_len):
byte_a = data_a[offset] if offset < len(data_a) else None
byte_b = data_b[offset] if offset < len(data_b) else None
if byte_a != byte_b:
# payload_offset = data_section_offset + HEADER_LEN
payload_off = offset + HEADER_LEN
field = lookup_field_name(sub, page_key, payload_off)
diffs.append(ByteDiff(
payload_offset=payload_off,
before=byte_a if byte_a is not None else -1,
after=byte_b if byte_b is not None else -1,
field_name=field,
))
if diffs:
entry = SUB_TABLE.get(sub)
sub_name = entry[0] if entry else f"UNKNOWN_{sub:02X}"
results.append(FrameDiff(sub=sub, page_key=page_key, sub_name=sub_name, diffs=diffs))
return results
# ──────────────────────────────────────────────────────────────────────────────
# Report rendering
# ──────────────────────────────────────────────────────────────────────────────
def format_hex_dump(data: bytes, indent: str = " ") -> list[str]:
"""Compact 16-bytes-per-line hex dump. Returns list of lines."""
lines = []
for row_start in range(0, len(data), 16):
chunk = data[row_start:row_start + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
lines.append(f"{indent}{row_start:04x}: {hex_part}")
return lines
def render_session_report(
session: Session,
diffs: Optional[list[FrameDiff]],
prev_session_index: Optional[int],
) -> str:
lines: list[str] = []
n_bw = len(session.bw_frames)
n_s3 = len(session.s3_frames)
total = n_bw + n_s3
is_complete = any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in session.bw_frames
)
status = "" if is_complete else " [IN PROGRESS]"
lines.append(f"{'='*72}")
lines.append(f"SESSION {session.index}{status}")
lines.append(f"{'='*72}")
lines.append(f"Frames: {total} (BW: {n_bw}, S3: {n_s3})")
if n_bw != n_s3:
lines.append(f" WARNING: BW/S3 frame count mismatch — protocol sync issue?")
lines.append("")
# ── Frame inventory ──────────────────────────────────────────────────────
lines.append("FRAME INVENTORY")
for seq_i, af in enumerate(session.all_frames):
if af.header is not None:
sub_hex = f"{af.header.sub:02X}"
page_str = f" (page {af.header.page_key:04X})" if af.header.page_key != 0 else ""
else:
sub_hex = "??"
page_str = ""
chk = ""
if af.frame.checksum_valid is False:
chk = " [BAD CHECKSUM]"
elif af.frame.checksum_valid is True:
chk = f" [{af.frame.checksum_type}]"
lines.append(
f" [{af.source}] #{seq_i:<3} SUB={sub_hex} {af.sub_name:<30}{page_str}"
f" len={len(af.frame.payload)}{chk}"
)
lines.append("")
# ── Hex dumps ────────────────────────────────────────────────────────────
lines.append("HEX DUMPS")
for seq_i, af in enumerate(session.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
lines.append(f" [{af.source}] #{seq_i} SUB={sub_hex} {af.sub_name}")
dump_lines = format_hex_dump(af.frame.payload, indent=" ")
if dump_lines:
lines.extend(dump_lines)
else:
lines.append(" (empty payload)")
lines.append("")
# ── Diff section ─────────────────────────────────────────────────────────
if diffs is not None:
if prev_session_index is not None:
lines.append(f"DIFF vs SESSION {prev_session_index}")
else:
lines.append("DIFF")
if not diffs:
lines.append(" (no changes)")
else:
for fd in diffs:
page_str = f" (page {fd.page_key:04X})" if fd.page_key != 0 else ""
lines.append(f" SUB {fd.sub:02X} ({fd.sub_name}){page_str}:")
for bd in fd.diffs:
field_str = f" [{bd.field_name}]" if bd.field_name else ""
before_str = f"{bd.before:02x}" if bd.before >= 0 else "--"
after_str = f"{bd.after:02x}" if bd.after >= 0 else "--"
lines.append(
f" offset [{bd.payload_offset:3d}] 0x{bd.payload_offset:04X}: "
f"{before_str} -> {after_str}{field_str}"
)
lines.append("")
return "\n".join(lines) + "\n"
def write_report(session: Session, report_text: str, outdir: Path) -> Path:
outdir.mkdir(parents=True, exist_ok=True)
out_path = outdir / f"session_{session.index:03d}.report"
out_path.write_text(report_text, encoding="utf-8")
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Claude export
# ──────────────────────────────────────────────────────────────────────────────
def _hex_block(data: bytes, bytes_per_row: int = 16) -> list[str]:
"""Hex dump with offset + hex + ASCII columns."""
lines = []
for row in range(0, len(data), bytes_per_row):
chunk = data[row:row + bytes_per_row]
hex_col = " ".join(f"{b:02x}" for b in chunk)
hex_col = f"{hex_col:<{bytes_per_row * 3 - 1}}"
asc_col = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f" {row:04x} {hex_col} |{asc_col}|")
return lines
def render_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> str:
"""
Produce a single self-contained Markdown file suitable for pasting into
a Claude conversation for protocol reverse-engineering assistance.
Structure:
1. Context block — what this is, protocol background, field map
2. Capture summary — session count, frame counts, what changed
3. Per-diff section — one section per session pair that had changes:
a. Diff table (before/after bytes, known field labels)
b. Full hex dumps of ONLY the frames that changed
4. Full hex dumps of all frames in sessions with no prior comparison
(session 0 baseline)
"""
import datetime
lines: list[str] = []
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
s3_name = s3_path.name if s3_path else "raw_s3.bin"
bw_name = bw_path.name if bw_path else "raw_bw.bin"
# ── 1. Context block ──────────────────────────────────────────────────
lines += [
f"# Instantel MiniMate Plus — Protocol Capture Analysis",
f"Generated: {now} | Source: `{s3_name}` + `{bw_name}`",
"",
"## Protocol Background",
"",
"This file contains parsed RS-232 captures from an Instantel MiniMate Plus",
"seismograph communicating with Blastware PC software at 38400 baud 8N1.",
"",
"**Frame structure (de-stuffed payload):**",
"```",
" [0] CMD 0x10 = BW request, 0x00 = S3 response",
" [1] ? 0x00 (BW) or 0x10 (S3)",
" [2] SUB Command/response identifier (key field)",
" [3] OFFSET_HI Page offset high byte",
" [4] OFFSET_LO Page offset low byte",
" [5+] DATA Payload data section",
"```",
"",
"**Response SUB rule:** response_SUB = 0xFF - request_SUB (confirmed, no exceptions observed)",
"",
"**Known field map** (offsets from payload[0]):",
"```",
" SUB F7 (EVENT_INDEX_RESPONSE):",
" [80] 0x52 backlight_on_time uint8 seconds",
" [88] 0x58 power_save_timeout uint8 minutes",
" [89] 0x59 monitoring_lcd_cycle uint16BE 65500=disabled",
" SUB E5 page 0x082A (COMPLIANCE_CONFIG_RESPONSE):",
" [45] 0x2D record_time float32BE seconds (7s=40E00000, 13s=41500000)",
" SUB 82 (TRIGGER_CONFIG_WRITE, BW-side only):",
" [22] trigger_sample_width uint8 samples",
"```",
"",
"**Session boundary:** a compliance session ends when BW sends SUB 0x74 (WRITE_CONFIRM_C).",
"Sessions are numbered from 0. The diff compares consecutive complete sessions.",
"",
]
# ── 2. Capture summary ────────────────────────────────────────────────
lines += ["## Capture Summary", ""]
lines.append(f"Sessions found: {len(sessions)}")
for sess in sessions:
is_complete = any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in sess.bw_frames
)
status = "complete" if is_complete else "partial/in-progress"
n_bw, n_s3 = len(sess.bw_frames), len(sess.s3_frames)
changed = len(diffs[sess.index] or []) if sess.index < len(diffs) else 0
changed_str = f" ({changed} SUBs changed vs prev)" if sess.index > 0 else " (baseline)"
lines.append(f" Session {sess.index} [{status}]: BW={n_bw} S3={n_s3} frames{changed_str}")
lines.append("")
# ── 3. Per-diff sections ──────────────────────────────────────────────
any_diffs = False
for sess in sessions:
sess_diffs = diffs[sess.index] if sess.index < len(diffs) else None
if sess_diffs is None or sess.index == 0:
continue
any_diffs = True
prev_idx = sess.index - 1
lines += [
f"---",
f"## Diff: Session {prev_idx} -> Session {sess.index}",
"",
]
if not sess_diffs:
lines.append("_No byte changes detected between these sessions._")
lines.append("")
continue
# Build index of changed frames for this session (and prev)
prev_sess = sessions[prev_idx] if prev_idx < len(sessions) else None
for fd in sess_diffs:
page_str = f" page 0x{fd.page_key:04X}" if fd.page_key != 0 else ""
lines += [
f"### SUB {fd.sub:02X}{fd.sub_name}{page_str}",
"",
]
# Diff table
known_count = sum(1 for bd in fd.diffs if bd.field_name)
unknown_count = sum(1 for bd in fd.diffs if not bd.field_name)
lines.append(
f"Changed bytes: **{len(fd.diffs)}** total "
f"({known_count} known fields, {unknown_count} unknown)"
)
lines.append("")
lines.append("| Offset | Hex | Dec | Session {0} | Session {1} | Field |".format(prev_idx, sess.index))
lines.append("|--------|-----|-----|" + "-" * 12 + "|" + "-" * 12 + "|-------|")
for bd in fd.diffs:
before_s = f"`{bd.before:02x}`" if bd.before >= 0 else "`--`"
after_s = f"`{bd.after:02x}`" if bd.after >= 0 else "`--`"
before_d = str(bd.before) if bd.before >= 0 else "--"
after_d = str(bd.after) if bd.after >= 0 else "--"
field = f"`{bd.field_name}`" if bd.field_name else "**UNKNOWN**"
lines.append(
f"| [{bd.payload_offset}] 0x{bd.payload_offset:04X} "
f"| {before_s}->{after_s} | {before_d}->{after_d} "
f"| {before_s} | {after_s} | {field} |"
)
lines.append("")
# Hex dumps of the changed frame in both sessions
def _find_af(target_sess: Session, sub: int, page_key: int) -> Optional[AnnotatedFrame]:
for af in target_sess.all_frames:
if af.header and af.header.sub == sub and af.header.page_key == page_key:
return af
return None
af_prev = _find_af(sessions[prev_idx], fd.sub, fd.page_key) if prev_sess else None
af_curr = _find_af(sess, fd.sub, fd.page_key)
lines.append("**Hex dumps (full de-stuffed payload):**")
lines.append("")
for label, af in [(f"Session {prev_idx} (before)", af_prev),
(f"Session {sess.index} (after)", af_curr)]:
if af is None:
lines.append(f"_{label}: frame not found_")
lines.append("")
continue
lines.append(f"_{label}_ — {len(af.frame.payload)} bytes:")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
if not any_diffs:
lines += [
"---",
"## Diffs",
"",
"_Only one session found — no diff available. "
"Run a second capture with changed settings to see what moves._",
"",
]
# ── 4. Baseline hex dumps (session 0, all frames) ─────────────────────
if sessions:
baseline = sessions[0]
lines += [
"---",
f"## Baseline — Session 0 (all frames)",
"",
"Full hex dump of every frame in the first session.",
"Use this to map field positions from known values.",
"",
]
for seq_i, af in enumerate(baseline.all_frames):
sub_hex = f"{af.header.sub:02X}" if af.header else "??"
page_str = f" page 0x{af.header.page_key:04X}" if af.header and af.header.page_key != 0 else ""
chk_str = f" [{af.frame.checksum_type}]" if af.frame.checksum_valid else ""
lines.append(
f"### [{af.source}] #{seq_i} SUB {sub_hex}{af.sub_name}{page_str}{chk_str}"
)
lines.append(f"_{len(af.frame.payload)} bytes_")
lines.append("```")
lines += _hex_block(af.frame.payload)
lines.append("```")
lines.append("")
lines += [
"---",
"_End of analysis. To map an unknown field: change exactly one setting in Blastware,_",
"_capture again, run the analyzer, and look for the offset that moved._",
]
return "\n".join(lines) + "\n"
def write_claude_export(
sessions: list[Session],
diffs: list[Optional[list[FrameDiff]]],
outdir: Path,
s3_path: Optional[Path] = None,
bw_path: Optional[Path] = None,
) -> Path:
import datetime
outdir.mkdir(parents=True, exist_ok=True)
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = outdir / f"claude_export_{stamp}.md"
out_path.write_text(
render_claude_export(sessions, diffs, s3_path, bw_path),
encoding="utf-8"
)
return out_path
# ──────────────────────────────────────────────────────────────────────────────
# Post-processing mode
# ──────────────────────────────────────────────────────────────────────────────
def run_postprocess(s3_path: Path, bw_path: Path, outdir: Path, export: bool = False) -> None:
print(f"s3_analyzer v{__version__}")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print()
s3_frames, bw_frames = load_and_annotate(s3_path, bw_path)
print(f"Parsed: {len(s3_frames)} S3 frames, {len(bw_frames)} BW frames")
sessions = split_into_sessions(bw_frames, s3_frames)
print(f"Sessions: {len(sessions)}")
print()
all_diffs: list[Optional[list[FrameDiff]]] = [None]
prev_session: Optional[Session] = None
for sess in sessions:
sess_diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_session is not None:
sess_diffs = diff_sessions(prev_session, sess)
prev_idx = prev_session.index
all_diffs.append(sess_diffs)
report = render_session_report(sess, sess_diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(sess_diffs) if sess_diffs else 0
print(f" Session {sess.index}: {len(sess.all_frames)} frames, {n_diffs} changed SUBs -> {out_path.name}")
prev_session = sess
if export:
export_path = write_claude_export(sessions, all_diffs, outdir, s3_path, bw_path)
print(f"\n Claude export -> {export_path.name}")
print()
print(f"Reports written to: {outdir}")
# ──────────────────────────────────────────────────────────────────────────────
# Live mode
# ──────────────────────────────────────────────────────────────────────────────
def live_loop(
s3_path: Path,
bw_path: Path,
outdir: Path,
poll_interval: float = 0.05,
) -> None:
"""
Tail both raw files continuously, re-parsing on new bytes.
Emits a session report as soon as BW SUB 0x74 is detected.
"""
print(f"s3_analyzer v{__version__} — LIVE MODE")
print(f" S3 file : {s3_path}")
print(f" BW file : {bw_path}")
print(f" Out dir : {outdir}")
print(f" Poll : {poll_interval*1000:.0f}ms")
print("Waiting for frames... (Ctrl+C to stop)")
print()
s3_buf = bytearray()
bw_buf = bytearray()
s3_pos = 0
bw_pos = 0
last_s3_count = 0
last_bw_count = 0
sessions: list[Session] = []
prev_complete_session: Optional[Session] = None
try:
while True:
# Read new bytes from both files
changed = False
if s3_path.exists():
with s3_path.open("rb") as fh:
fh.seek(s3_pos)
new_bytes = fh.read()
if new_bytes:
s3_buf.extend(new_bytes)
s3_pos += len(new_bytes)
changed = True
if bw_path.exists():
with bw_path.open("rb") as fh:
fh.seek(bw_pos)
new_bytes = fh.read()
if new_bytes:
bw_buf.extend(new_bytes)
bw_pos += len(new_bytes)
changed = True
if changed:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
new_s3 = len(s3_annotated) - last_s3_count
new_bw = len(bw_annotated) - last_bw_count
if new_s3 > 0 or new_bw > 0:
last_s3_count = len(s3_annotated)
last_bw_count = len(bw_annotated)
print(f"[+] S3:{len(s3_annotated)} BW:{len(bw_annotated)} frames", end="")
# Annotate newest BW frame
if bw_annotated:
latest_bw = bw_annotated[-1]
sub_str = f"SUB={latest_bw.header.sub:02X}" if latest_bw.header else "SUB=??"
print(f" latest BW {sub_str} {latest_bw.sub_name}", end="")
print()
# Check for session close
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
# A complete session has the closing 0x74
complete_sessions = [
s for s in all_sessions
if any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in s.bw_frames
)
]
# Emit reports for newly completed sessions
for sess in complete_sessions[len(sessions):]:
diffs: Optional[list[FrameDiff]] = None
prev_idx: Optional[int] = None
if prev_complete_session is not None:
diffs = diff_sessions(prev_complete_session, sess)
prev_idx = prev_complete_session.index
report = render_session_report(sess, diffs, prev_idx)
out_path = write_report(sess, report, outdir)
n_diffs = len(diffs) if diffs else 0
print(f"\n [+] Session {sess.index} complete: {len(sess.all_frames)} frames, "
f"{n_diffs} changed SUBs -> {out_path.name}\n")
prev_complete_session = sess
sessions = complete_sessions
time.sleep(poll_interval)
except KeyboardInterrupt:
print("\nStopped.")
# Emit any in-progress (incomplete) session as a partial report
if s3_buf or bw_buf:
s3_frames_raw = parse_s3(bytes(s3_buf), trailer_len=0)
bw_frames_raw = parse_bw(bytes(bw_buf), trailer_len=0, validate_checksum=True)
s3_annotated = annotate_frames(s3_frames_raw, "S3")
bw_annotated = annotate_frames(bw_frames_raw, "BW")
all_sessions = split_into_sessions(bw_annotated, s3_annotated)
incomplete = [
s for s in all_sessions
if not any(
af.header is not None and af.header.sub == SESSION_CLOSE_SUB
for af in s.bw_frames
)
]
for sess in incomplete:
report = render_session_report(sess, diffs=None, prev_session_index=None)
out_path = write_report(sess, report, outdir)
print(f" Partial session {sess.index} written -> {out_path.name}")
# ──────────────────────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="s3_analyzer — Instantel MiniMate Plus live protocol analyzer"
)
ap.add_argument("--s3", type=Path, required=True, help="Path to raw_s3.bin (S3→BW raw capture)")
ap.add_argument("--bw", type=Path, required=True, help="Path to raw_bw.bin (BW→S3 raw capture)")
ap.add_argument("--live", action="store_true", help="Live mode: tail files as they grow")
ap.add_argument("--export", action="store_true", help="Also write a claude_export_<ts>.md file for Claude analysis")
ap.add_argument("--outdir", type=Path, default=None, help="Output directory for .report files (default: same as input)")
ap.add_argument("--poll", type=float, default=0.05, help="Live mode poll interval in seconds (default: 0.05)")
args = ap.parse_args()
outdir = args.outdir
if outdir is None:
outdir = args.s3.parent
if args.live:
live_loop(args.s3, args.bw, outdir, poll_interval=args.poll)
else:
if not args.s3.exists():
print(f"ERROR: S3 file not found: {args.s3}", file=sys.stderr)
sys.exit(1)
if not args.bw.exists():
print(f"ERROR: BW file not found: {args.bw}", file=sys.stderr)
sys.exit(1)
run_postprocess(args.s3, args.bw, outdir, export=args.export)
if __name__ == "__main__":
main()