feat: add erase-all protocol and browse helpers to protocol/client layer

protocol.py:
- SUB_ERASE_ALL_BEGIN = 0xA3, SUB_ERASE_ALL_CONFIRM = 0xA2 (confirmed 4-11-26 MITM)
- SUB_CHANNEL_CONFIG (0x06) data length = 0x24 (36 bytes) in DATA_LENGTHS
- begin_erase_all()              — single frame, token=0xFE, response 0x5C
- confirm_erase_all()            — single frame, token=0xFE, response 0x5D
- read_event_storage_range()     — two-step read (probe+data), token=0xFE
  Response last 8 bytes = first/last stored event key; both 0x01110000 after erase

client.py:
- list_event_keys()              — browse-mode 1E→0A→1F walk, no waveform download;
  returns list of hex key strings; used as fast pre-check before get_events()
- get_events(skip_waveform_for_keys=set())
  — for already-seen keys: only 0A+1F(browse), skips 1E-arm/0C/POLL×3/5A entirely
- delete_all_events()            — orchestrates the confirmed erase sequence:
  0xA3 → 0x1C → 0x06 → 0xA2; logs first/last key from storage range response

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-11 01:14:37 -04:00
committed by serversdown
parent 4921b0489a
commit a3b8d10fa8
2 changed files with 200 additions and 2 deletions
+120 -1
View File
@@ -252,7 +252,98 @@ class MiniMateClient:
log.info("count_events: %d event(s) found via 1E/1F chain", count)
return count
def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None) -> list[Event]:
def list_event_keys(self) -> list[str]:
"""
Return the hex key strings for all stored events without downloading
any waveform data. Uses the same browse-mode 1E -> 0A -> 1F walk as
count_events() but collects the key at each step.
Returns:
List of 8-char lowercase hex strings, e.g. ["01110000", "0111245a"].
Empty list if device has no stored events or 1E fails.
"""
proto = self._require_proto()
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
log.warning("list_event_keys: 1E failed: %s -- returning []", exc)
return []
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("list_event_keys: device is empty")
return []
keys: list[str] = []
while data8[4:8] != b"\x00\x00\x00\x00":
keys.append(key4.hex())
try:
proto.read_waveform_header(key4)
except ProtocolError as exc:
log.warning(
"list_event_keys: 0A failed for key=%s: %s -- stopping",
key4.hex(), exc,
)
break
try:
key4, data8 = proto.advance_event(browse=True)
log.debug(
"list_event_keys: 1F -> key=%s trailing=%s",
key4.hex(), data8[4:8].hex(),
)
except ProtocolError as exc:
log.warning(
"list_event_keys: 1F failed after %d event(s): %s -- stopping",
len(keys), exc,
)
break
log.info("list_event_keys: %d key(s): %s", len(keys), keys)
return keys
def delete_all_events(self) -> None:
"""
Erase all stored events from the device memory.
This performs the complete erase sequence confirmed from the 4-11-26
MITM capture of a Blastware ACH session:
1. SUB 0xA3 (begin_erase_all) — initiate erase, token=0xFE
2. SUB 0x1C (read_monitor_status) — status read between erase commands
3. SUB 0x06 (read_event_storage_range) — verify storage state, token=0xFE
4. SUB 0xA2 (confirm_erase_all) — commit erase, token=0xFE
After this call the device's event memory is empty. The unit returns to
its normal operating state automatically (no restart-monitoring call needed).
Raises:
ProtocolError: on timeout or unexpected device response.
"""
proto = self._require_proto()
log.info("delete_all_events: step 1/4 — begin erase (SUB 0xA3)")
proto.begin_erase_all()
log.debug("delete_all_events: 0xA3 ack received")
log.info("delete_all_events: step 2/4 — monitor status read (SUB 0x1C)")
proto.read_monitor_status()
log.debug("delete_all_events: 0x1C read complete")
log.info("delete_all_events: step 3/4 — event storage range read (SUB 0x06)")
rng = proto.read_event_storage_range()
if len(rng.data) >= 8:
first_key = rng.data[-8:-4].hex()
last_key = rng.data[-4:].hex()
log.info(
"delete_all_events: storage range — first=%s last=%s",
first_key, last_key,
)
log.debug("delete_all_events: 0x06 read complete")
log.info("delete_all_events: step 4/4 — confirm erase (SUB 0xA2)")
proto.confirm_erase_all()
log.info("delete_all_events: erase confirmed — device memory cleared")
def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None) -> list[Event]:
"""
Download all stored events from the device using the confirmed
1E → 0A → 0C → 5A → 1F event-iterator protocol.
@@ -303,6 +394,34 @@ class MiniMateClient:
while data8[4:8] != b"\x00\x00\x00\x00":
cur_key = key4 # key for this event's 0A/1E-arm/0C/5A calls
log.info("get_events: record %d key=%s", idx, cur_key.hex())
# Fast-advance path: if this key is already downloaded, skip
# 1E-arm/0C/POLL/5A entirely. Only 0A + 1F(browse) are needed
# to advance the device's internal pointer to the next event.
# This is identical to the browse-mode walk in count_events().
if skip_waveform_for_keys and cur_key.hex() in skip_waveform_for_keys:
log.debug("get_events: key=%s already seen -- fast-advance only", cur_key.hex())
try:
proto.read_waveform_header(cur_key)
except ProtocolError as exc:
log.warning(
"get_events: 0A failed for key=%s (skip path): %s -- stopping",
cur_key.hex(), exc,
)
break
try:
key4, data8 = proto.advance_event(browse=True)
except ProtocolError as exc:
log.warning(
"get_events: 1F failed for key=%s (skip path): %s -- stopping",
cur_key.hex(), exc,
)
break
idx += 1
if stop_after_index is not None and idx > stop_after_index:
break
continue
ev = Event(index=idx)
ev._waveform_key = cur_key