big refactor of waveform protocol.

This commit is contained in:
2026-05-03 01:20:21 -04:00
parent d758825c67
commit 45e61fbcaf
7 changed files with 409 additions and 278 deletions
+67 -40
View File
@@ -1345,6 +1345,11 @@ def _decode_waveform_record_into(data: bytes, event: Event) -> None:
event.timestamp = Timestamp.from_continuous_record(data)
except Exception as exc:
log.warning("continuous record timestamp decode failed: %s", exc)
elif event.record_type == "Waveform (Short)":
try:
event.timestamp = Timestamp.from_short_record(data)
except Exception as exc:
log.warning("short record timestamp decode failed: %s", exc)
# ── Peak values (per-channel PPV + Peak Vector Sum) ───────────────────────
try:
@@ -1636,51 +1641,73 @@ def _decode_a5_waveform(
}
def _detect_record_format(data: bytes) -> Optional[str]:
"""
Detect which timestamp-header format a 210-byte 0C waveform record uses.
THREE formats observed on BE11529 firmware S338.17:
"single_shot" — 9-byte header:
[day] [0x10] [month] [year_BE:2] [unknown] [hour] [min] [sec]
sub_code=0x10 at byte [1]. Year at [3:5].
"continuous" — 10-byte header:
[0x10] [day] [0x10] [month] [year_BE:2] [unknown] [hour] [min] [sec]
marker 0x10 at byte [0] AND byte [2]. Year at [4:6].
"short" — 8-byte header (NEW 2026-05-01):
[day] [month] [year_BE:2] [unknown] [hour] [min] [sec]
No marker bytes. Year at [2:4].
Each format has the year (uint16 BE) at a UNIQUE byte position, so we can
disambiguate by scanning each candidate position and picking the one
where the year falls in a sane range (2015..2050).
Returns "single_shot" / "continuous" / "short" or None if no format matches.
"""
if len(data) < 8:
return None
def _sane_year(hi: int, lo: int) -> bool:
y = (hi << 8) | lo
return 2015 <= y <= 2050
# Order matters: prefer formats with stronger marker-byte evidence first.
if data[1] == 0x10 and len(data) >= 9 and _sane_year(data[3], data[4]):
return "single_shot"
if (data[0] == 0x10 and data[2] == 0x10
and len(data) >= 10 and _sane_year(data[4], data[5])):
return "continuous"
if _sane_year(data[2], data[3]):
return "short"
return None
def _extract_record_type(data: bytes) -> Optional[str]:
"""
Detect the waveform record format by inspecting the first 3 bytes of the
210-byte record returned by SUB 0C.
Return a human-readable name for the waveform record format detected
in the first bytes of a 210-byte 0C record.
Two formats exist (confirmed from BE11529 captures and CLAUDE.md docs):
Single-shot mode — 9-byte header:
data[0] = day
data[1] = 0x10 ← sub_code marker
data[2] = month
data[3:5] = year (BE)
...
Continuous mode — 10-byte header:
data[0] = 0x10 ← marker A
data[1] = day ← variable (NOT 0x10)
data[2] = 0x10 ← marker B
data[3] = month
data[4:6] = year (BE)
...
Disambiguate by checking BOTH data[0] and data[2]:
- data[0]==0x10 AND data[2]==0x10 → Continuous (10-byte header)
- data[1]==0x10 → Single-shot (9-byte header)
- otherwise → Unknown
Previous logic only checked data[1] and so mis-classified continuous-mode
records as "Unknown(0xXX)" wherever day != 0x10 — see filename
M5290000.000 regression report (2026-05-01 SFM log).
Maps to the format codes returned by _detect_record_format():
"single_shot""Waveform"
"continuous""Waveform (Continuous)"
"short""Waveform (Short)"
None → "Unknown(XX.YY.ZZ)"
"""
if len(data) < 3:
return None
# 10-byte continuous format: 0x10 markers at byte 0 AND byte 2
if data[0] == 0x10 and data[2] == 0x10:
return "Waveform (Continuous)"
# 9-byte single-shot format: 0x10 sub_code marker at byte 1
if data[1] == 0x10:
fmt = _detect_record_format(data)
if fmt == "single_shot":
return "Waveform"
log.warning(
"_extract_record_type: unrecognized header: data[0:3]=%02X %02X %02X",
data[0], data[1], data[2],
)
return f"Unknown({data[0]:02X}.{data[1]:02X}.{data[2]:02X})"
if fmt == "continuous":
return "Waveform (Continuous)"
if fmt == "short":
return "Waveform (Short)"
if len(data) >= 3:
log.warning(
"_extract_record_type: unrecognized header: data[0:3]=%02X %02X %02X",
data[0], data[1], data[2],
)
return f"Unknown({data[0]:02X}.{data[1]:02X}.{data[2]:02X})"
return None
def _extract_peak_floats(data: bytes) -> Optional[PeakValues]:
"""