feat: implement reliable event counting via 1E/1F chain and update device info

This commit is contained in:
Brian Harrison
2026-04-03 16:02:10 -04:00
parent 7cd8fda5e8
commit 2cb95cd45e
2 changed files with 49 additions and 6 deletions

View File

@@ -163,6 +163,43 @@ class MiniMateClient:
log.info("connect: %s", device_info)
return device_info
def count_events(self) -> int:
"""
Count stored events by iterating the 1E → 1F key chain.
This is the only reliable way to get the true event count. The SUB 08
event index payload has a field that was assumed to be an event count
(uint32 BE at offset +3) but empirically always returns 1 regardless of
how many events are stored — Blastware appears to download one event per
TCP session, so the index may reflect session-scoped state rather than
device-wide storage.
This method issues 1E (first key) then 1F repeatedly until the null key
b'\\x00\\x00\\x00\\x00', counting as it goes. No 0A/0C/5A reads are
performed, so it is much faster than get_events().
Returns:
Number of stored waveform events (0 if device is empty).
"""
proto = self._require_proto()
try:
key4, _ = proto.read_event_first()
except ProtocolError as exc:
log.warning("count_events: 1E failed: %s — returning 0", exc)
return 0
count = 0
while key4 != b"\x00\x00\x00\x00":
count += 1
try:
key4 = proto.advance_event()
except ProtocolError as exc:
log.warning("count_events: 1F failed after %d events: %s", count, exc)
break
log.info("count_events: %d event(s) found via 1E/1F chain", count)
return count
def get_events(self, full_waveform: bool = False, debug: bool = False) -> list[Event]:
"""
Download all stored events from the device using the confirmed
@@ -475,11 +512,13 @@ def _decode_event_count(data: bytes) -> int:
log.warning("event index payload too short (%d bytes), assuming 0 events", len(data))
return 0
# Log the raw bytes so we can verify this decode against known event counts
log.warning(
"event_index raw (first 16 bytes): %s",
" ".join(f"{b:02x}" for b in data[:16]),
)
# Log the full payload so we can reverse-engineer the format
log.warning("event_index raw (%d bytes total):", len(data))
for off in range(0, len(data), 16):
chunk = data[off:off+16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
asc_part = "".join(chr(b) if 0x20 <= b < 0x7f else "." for b in chunk)
log.warning(" [%04x]: %-47s %s", off, hex_part, asc_part)
# Try the uint32 at +3 first
count = struct.unpack_from(">I", data, 3)[0]