fix: enhance event record handling to include MonitorLog sub_code and adjust timestamp parsing logic

This commit is contained in:
Brian Harrison
2026-04-03 18:58:46 -04:00
parent 755050b347
commit 2286d2ccf8
2 changed files with 32 additions and 14 deletions

View File

@@ -268,7 +268,7 @@ Anchor: `b'\x01\x2c\x00\x00\xbe\x80\x00\x00\x00\x00'`, search `cfg[0:150]`
| Offset | Field | Type | | Offset | Field | Type |
|---|---|---| |---|---|---|
| 0 | day | uint8 | | 0 | day | uint8 |
| 1 | sub_code | uint8 (`0x10` = Waveform) | | 1 | sub_code | uint8 (`0x10` = Waveform, `0x03` = MonitorLog) |
| 2 | month | uint8 | | 2 | month | uint8 |
| 34 | year | uint16 BE | | 34 | year | uint16 BE |
| 5 | unknown | uint8 (always 0) | | 5 | unknown | uint8 (always 0) |

View File

@@ -351,8 +351,17 @@ class MiniMateClient:
key4.hex(), exc, key4.hex(), exc,
) )
# Include all full records regardless of sub_code / record_type.
# Partial records (proceed=False, rec_len < 0x30 or 0A failed) are
# the only thing we skip — we have no data to decode for those.
if proceed:
events.append(ev) events.append(ev)
idx += 1 idx += 1
else:
log.info(
"get_events: key=%s — skipping partial/failed record (rec_len < 0x30)",
key4.hex(),
)
# SUB 1F — advance to the next record key. # SUB 1F — advance to the next record key.
# Uses browse mode (all-zero params) — empirically confirmed to work on # Uses browse mode (all-zero params) — empirically confirmed to work on
@@ -606,20 +615,25 @@ def _decode_waveform_record_into(data: bytes, event: Event) -> None:
Modifies event in-place. Modifies event in-place.
""" """
# ── Timestamp ─────────────────────────────────────────────────────────────
# 9-byte format: [day][sub_code][month][year:2 BE][unknown][hour][min][sec]
try:
event.timestamp = Timestamp.from_waveform_record(data)
except Exception as exc:
log.warning("waveform record timestamp decode failed: %s", exc)
# ── Record type ─────────────────────────────────────────────────────────── # ── Record type ───────────────────────────────────────────────────────────
# Decoded from byte[1] (sub_code), not from ASCII string search # Decoded from byte[1] (sub_code) first so we can gate timestamp parsing.
try: try:
event.record_type = _extract_record_type(data) event.record_type = _extract_record_type(data)
except Exception as exc: except Exception as exc:
log.warning("waveform record type decode failed: %s", exc) log.warning("waveform record type decode failed: %s", exc)
# ── Timestamp ─────────────────────────────────────────────────────────────
# 9-byte format for sub_code=0x10 Waveform records:
# [day][sub_code][month][year:2 BE][unknown][hour][min][sec]
# MonitorLog (sub_code=0x03) records have a different byte layout — applying
# the waveform layout produces garbage (year=1031, month=16). Leave timestamp
# None for non-Waveform records until the correct layout is confirmed.
if event.record_type == "Waveform":
try:
event.timestamp = Timestamp.from_waveform_record(data)
except Exception as exc:
log.warning("waveform record timestamp decode failed: %s", exc)
# ── Peak values (per-channel PPV + Peak Vector Sum) ─────────────────────── # ── Peak values (per-channel PPV + Peak Vector Sum) ───────────────────────
try: try:
peak_values = _extract_peak_floats(data) peak_values = _extract_peak_floats(data)
@@ -921,10 +935,14 @@ def _extract_record_type(data: bytes) -> Optional[str]:
code = data[1] code = data[1]
if code == 0x10: if code == 0x10:
return "Waveform" return "Waveform"
# Unknown code — log it so we can identify histogram/noise sub_codes from real captures if code == 0x03:
log.warning("_extract_record_type: unknown sub_code=0x%02X — returning raw string", code) # Monitor log record — Instantel's periodic time-weighted average record.
# Appears in BW's event list but NOT on the physical device display.
# The byte layout differs from 0x10 waveform records: the timestamp fields
# decode as garbage under the waveform layout and should not be trusted.
return "MonitorLog"
log.warning("_extract_record_type: unknown sub_code=0x%02X", code)
return f"Unknown(0x{code:02X})" return f"Unknown(0x{code:02X})"
return None
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]: def _extract_peak_floats(data: bytes) -> Optional[PeakValues]: