429c6ac87a
test: add regression tests for v0.14.x SUB 5A protocol fixes refactor(logging): change warning logs to debug for less verbosity in write_blastware_file
975 lines
40 KiB
Python
975 lines
40 KiB
Python
"""
|
||
blastware_file.py — Blastware binary file codec for bidirectional interoperability.
|
||
|
||
Reads and writes the proprietary Instantel/Blastware file formats:
|
||
Waveform events (.CE0W, .VM0H, .440, .7M0, etc.) (extension encoding UNKNOWN — see below)
|
||
.MLG — Monitor log (monitoring session history)
|
||
|
||
All waveform formats share a common 22-byte file header prefix and identical
|
||
internal binary structure (same type tag 00 12 03 00, same STRT record layout).
|
||
Blastware identifies the file type by extension, not by a magic marker.
|
||
|
||
EXTENSION ENCODING — V10.72 firmware FULLY CONFIRMED 2026-04-22:
|
||
|
||
Direct / manual download: AB0 (3-char, no type character)
|
||
Call-home (ACH) download: AB0W or AB0H (4-char, W=waveform H=histogram)
|
||
|
||
AB = 2-char base-36 of (total_seconds % 1296), where
|
||
total_seconds = (event_local_time − 1985-01-01T00:00:00_local).
|
||
0 = always literal digit zero.
|
||
Verified against 3,248 call-home files from a 10-year production archive.
|
||
|
||
The 10-year archive contains only ACH files (all end in W or H).
|
||
Manual Blastware downloads produce 3-char AB0 extensions — same encoding
|
||
but without the trailing type character.
|
||
|
||
Old firmware (S338, 3-char extensions): encoding unknown / same as manual?
|
||
Micromate Series 4 uses a different scheme (literal datetime in filename).
|
||
|
||
─── File structure overview ─────────────────────────────────────────────────────
|
||
|
||
Waveform file structure (confirmed from example-events/4-3-26-multi/M529LIY6 (example event)):
|
||
|
||
[22B header] [21B STRT record] [body bytes] [26B footer]
|
||
|
||
Header (22 bytes):
|
||
10 00 01 80 00 00 — fixed prefix
|
||
49 6e 73 74 61 6e 74 65 6c 00 — b'Instantel\x00'
|
||
07 2c — fixed
|
||
00 12 03 00 — waveform file type tag (shared by all waveform extensions)
|
||
|
||
STRT record (21 bytes, immediately follows header):
|
||
53 54 52 54 — b'STRT'
|
||
ff fe — fixed (2 bytes)
|
||
[key4] — 4-byte waveform event key
|
||
[key4] — 4-byte waveform event key (repeated)
|
||
[zeros] — 7 bytes padding
|
||
[rectime] — uint8 record time in seconds
|
||
|
||
Body (variable — reconstructed from A5 frame data):
|
||
The body bytes are derived from the raw A5 frame wire content, specifically
|
||
from the DLE-decoded representation of each frame's contribution. See the
|
||
_frame_body_bytes() helper for the exact algorithm.
|
||
|
||
Footer (26 bytes):
|
||
0e 08
|
||
[ts1: 8B big-endian timestamp] — start timestamp
|
||
[ts2: 8B big-endian timestamp] — stop timestamp
|
||
00 01 00 02 00 00
|
||
[crc: 2B] — CRC (algorithm unconfirmed; written as 0x00 0x00 placeholder)
|
||
|
||
Timestamp format (big-endian, 8 bytes):
|
||
[day] [month] [year_HI] [year_LO] [0x00] [hour] [min] [sec]
|
||
|
||
MLG (monitor log, confirmed from example-events/4-3-26-multi/BE11529.MLG):
|
||
|
||
[308B header] [N × 292B records]
|
||
|
||
Header (308 bytes):
|
||
Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 — fixed (16B)
|
||
Offset 0x10: ... (unknown structure, written as zeros + serial)
|
||
Offset 0x2A: serial number (8 bytes, null-padded ASCII, e.g. "BE11529")
|
||
... zero-padded to 308 bytes total
|
||
|
||
Record (292 bytes each):
|
||
[2B CRC] — unknown algorithm; written as 0x00 0x00
|
||
22 01 0e 80 — record marker
|
||
[ts1: 8B big-endian timestamp] — start time
|
||
[ts2: 8B big-endian timestamp] — stop time (zeros if no stop)
|
||
[4B flags] — see MLG_FLAGS_* constants below
|
||
[10B serial] — null-padded serial number ASCII
|
||
[text] — for trigger records: [0x08][8B ts1_copy] then ASCII "Geo: X.XXX in/s"
|
||
for monitoring records: b'' (or minimal separator)
|
||
[zero-padded to 292 bytes]
|
||
|
||
─── Critical implementation notes ──────────────────────────────────────────────
|
||
|
||
Waveform body reconstruction algorithm (confirmed 2026-04-21 from verification against
|
||
M529LIY6 (example event) using raw_s3_20260403_153508.bin capture):
|
||
|
||
The waveform body bytes come from the A5 frame content, stripped of DLE-framing
|
||
artifacts. Each A5 frame contributes a different slice of its data section,
|
||
with DLE+{0x02,0x03,0x04} byte pairs stripped.
|
||
|
||
Skip amounts per frame index (offsets into frame.data):
|
||
A5[0] (probe): data[strt_pos + 21 + 7] (skip header + STRT record)
|
||
strt_pos found by searching frame.data[7:] for b'STRT';
|
||
the contribution starts at strt_pos + 21 within data[7:]
|
||
which equals strt_pos + 21 + 7 within frame.data.
|
||
A5[1]: data[13] (skip 7-byte frame.data prefix + 6 header bytes)
|
||
A5[2..N]: data[12] (skip 7-byte frame.data prefix + 5 header bytes)
|
||
Terminator A5: data[11] (1 byte less than chunk frames; terminator inner header
|
||
is 4 bytes instead of 5 — confirmed 2026-04-21)
|
||
|
||
DLE strip rule (applied AFTER slicing):
|
||
Strip any 0x10 byte that is immediately followed by 0x02, 0x03, or 0x04.
|
||
This undoes the DLE-escape that S3FrameParser preserves as literal pairs.
|
||
Applied to frame.data[skip:] + bytes([frame.chk_byte]) together, then
|
||
conditionally exclude the trailing chk_byte from the output.
|
||
|
||
chk_byte absorption:
|
||
When frame.data[-1] == 0x10 AND frame.chk_byte ∈ {0x02, 0x03, 0x04},
|
||
the last byte of frame.data is the DLE prefix of a split DLE+chk pair.
|
||
Including chk_byte in the strip buffer allows the pair to be stripped as
|
||
a unit. After stripping, the trailing chk_byte is ALWAYS removed — because
|
||
_strip_inner_frame_dles keeps the byte after the DLE (the chk_byte value),
|
||
and that value is the checksum, never payload. This applies to all three
|
||
cases (chk ∈ {0x02, 0x03, 0x04}) identically.
|
||
|
||
MLG CRC:
|
||
The algorithm that produces the 2-byte CRC at the start of each MLG record
|
||
is unknown. All examined records use non-zero values that do not match
|
||
CRC-16/CCITT, CRC-16/IBM, CRC-32 (truncated), word sums, XOR variants, or
|
||
any of the 40+ polynomial/init combinations tested. The writer emits 0x0000.
|
||
This produces files that Blastware may reject or display without the CRC check —
|
||
the exact impact on BW import is unknown (TODO: test).
|
||
|
||
─── Public API ──────────────────────────────────────────────────────────────────
|
||
|
||
blastware_filename(event, serial)
|
||
Return the correct Blastware filename for an event (e.g. "M529LIY6.CE0W").
|
||
Full AB0T extension encoding confirmed 2026-04-22 against 3,248 archive files.
|
||
Extension matches what Blastware itself would generate for the same event.
|
||
|
||
write_blastware_file(event, a5_frames, path)
|
||
Create a Blastware waveform file from an Event and the full A5 frame list.
|
||
All waveform extensions share the same binary format — the extension is set
|
||
by blastware_filename() based on the event timestamp and type.
|
||
|
||
read_blastware_file(path) → Event
|
||
Parse a Blastware waveform file into an Event object with waveform data populated.
|
||
(Not yet implemented — placeholder raises NotImplementedError.)
|
||
|
||
write_mlg(entries, serial, path)
|
||
Create a .MLG file from a list of MonitorLogEntry objects.
|
||
|
||
read_mlg(path) → list[MonitorLogEntry]
|
||
Parse a .MLG file into MonitorLogEntry objects.
|
||
(Not yet implemented — placeholder raises NotImplementedError.)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import datetime
|
||
import logging
|
||
import struct
|
||
from pathlib import Path
|
||
from typing import Optional, Union
|
||
|
||
from .framing import S3Frame
|
||
from .models import Event, MonitorLogEntry, Timestamp
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# ── File header constants ─────────────────────────────────────────────────────
|
||
|
||
# Common 16-byte prefix shared by waveform files and MLG (confirmed from binary inspection).
|
||
_FILE_HEADER_PREFIX = bytes.fromhex("1000018000004973") + b"tantel\x00\x07\x2c"
|
||
# = 10 00 01 80 00 00 49 73 74 61 6e 74 65 6c 00 07 2c (17 bytes)
|
||
# Confirmed breakdown: 10 00 01 80 00 00 = fixed; "Instantel\x00" = 10B; 07 2c = fixed
|
||
|
||
# Simpler construction:
|
||
_FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 17 bytes
|
||
|
||
# Waveform file type tag (4 bytes after common prefix) — shared by ALL waveform extensions
|
||
_WAVEFORM_TYPE_TAG = b"\x00\x12\x03\x00" # confirmed from M529LIY6 (example event) — same tag for .CE0W, .VM0H, etc.
|
||
|
||
# MLG type tag (4 bytes after common prefix)
|
||
_MLG_TYPE_TAG = b"\x22\x01\x0e\xa0" # confirmed from BE11529.MLG offset 0x11..0x14
|
||
|
||
# Total header sizes
|
||
_WAVEFORM_HEADER_SIZE = 22 # 17 + 4 = 21... wait. Let me recalculate.
|
||
# From binary: first 22 bytes = header, then STRT at byte 22.
|
||
# 17-byte common prefix + 4-byte type tag = 21 bytes. But observed header is 22B.
|
||
# Checking: 6 fixed + 10 "Instantel\x00" + 2 "07 2c" = 18B prefix, then 4B type tag = 22B.
|
||
# Re-count: b"\x10\x00\x01\x80\x00\x00" = 6B + b"Instantel\x00" = 10B + b"\x07\x2c" = 2B = 18B prefix.
|
||
_FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 18 bytes
|
||
_WAVEFORM_HEADER_SIZE = 22 # 18 + 4 = 22 bytes ✅
|
||
_MLG_HEADER_SIZE = 308 # confirmed from BE11529.MLG
|
||
|
||
# MLG record marker (4 bytes after 2-byte CRC at start of each record)
|
||
_MLG_RECORD_MARKER = b"\x22\x01\x0e\x80"
|
||
_MLG_RECORD_SIZE = 292 # bytes per record (confirmed from BE11529.MLG)
|
||
|
||
# MLG record flags (4 bytes at record[22:26])
|
||
# Confirmed from BE11529.MLG binary inspection:
|
||
MLG_FLAGS_START_ONLY = b"\xff\xff\x00\x00" # monitoring start with no stop
|
||
MLG_FLAGS_TRIGGER = b"\x01\x00\x02\x00" # triggered event (has ts1 + ts2)
|
||
MLG_FLAGS_INTERVAL = b"\x02\x00\x00\x00" # monitoring interval (has ts1 + ts2)
|
||
|
||
|
||
# ── Timestamp helpers ─────────────────────────────────────────────────────────
|
||
|
||
def _encode_ts_be(ts: Optional[datetime.datetime]) -> bytes:
|
||
"""
|
||
Encode a datetime as an 8-byte big-endian Blastware timestamp.
|
||
|
||
Format (waveform file and MLG record timestamps):
|
||
[day][month][year_HI][year_LO][0x00][hour][min][sec]
|
||
|
||
Big-endian year confirmed from M529LIY6 (example event) footer:
|
||
footer bytes [2..9] = 01 04 07 ea 00 00 1c 08
|
||
→ day=1 month=4 year=0x07ea=2026 hour=0 min=28 sec=8 ✅
|
||
|
||
Returns 8 zero bytes if ts is None.
|
||
"""
|
||
if ts is None:
|
||
return bytes(8)
|
||
return bytes([
|
||
ts.day,
|
||
ts.month,
|
||
(ts.year >> 8) & 0xFF,
|
||
ts.year & 0xFF,
|
||
0x00,
|
||
ts.hour,
|
||
ts.minute,
|
||
ts.second,
|
||
])
|
||
|
||
|
||
def _decode_ts_be(raw: bytes) -> Optional[datetime.datetime]:
|
||
"""
|
||
Decode an 8-byte big-endian Blastware timestamp.
|
||
|
||
Returns None if the bytes are all zero or structurally invalid.
|
||
"""
|
||
if len(raw) < 8 or raw == bytes(8):
|
||
return None
|
||
day = raw[0]
|
||
month = raw[1]
|
||
year = (raw[2] << 8) | raw[3]
|
||
hour = raw[5]
|
||
minute = raw[6]
|
||
sec = raw[7]
|
||
try:
|
||
return datetime.datetime(year, month, day, hour, minute, sec)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _ts_from_model(ts: Optional[Timestamp]) -> Optional[datetime.datetime]:
|
||
"""Convert a models.Timestamp to datetime.datetime, or None."""
|
||
if ts is None:
|
||
return None
|
||
try:
|
||
return datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
# ── DLE strip helper ──────────────────────────────────────────────────────────
|
||
|
||
def _strip_inner_frame_dles(data: bytes) -> bytes:
|
||
"""
|
||
Strip DLE (0x10) framing markers from A5 inner-frame content.
|
||
|
||
The A5 (bulk waveform stream) response body contains DLE-encoded sub-frame
|
||
structure. S3FrameParser preserves DLE+XX pairs as two literal bytes in
|
||
frame.data. Only the DLE marker byte needs to be removed; the following
|
||
byte is actual payload content.
|
||
|
||
Rule: when 0x10 is immediately followed by {0x02, 0x03, 0x04}, strip the
|
||
0x10 (DLE marker) and keep the following byte as payload.
|
||
|
||
Lone 0x10 bytes not followed by {0x02, 0x03, 0x04} are kept as-is.
|
||
|
||
Confirmed correct by verifying reconstructed waveform body against M529LIY6 (example event):
|
||
- 0x10 0x02 in terminator → 0x02 kept ✓
|
||
- 0x10 0x04 in terminator (month byte) → 0x04 kept ✓
|
||
"""
|
||
out = bytearray()
|
||
i = 0
|
||
while i < len(data):
|
||
b = data[i]
|
||
if b == 0x10 and i + 1 < len(data) and data[i + 1] in {0x02, 0x03, 0x04}:
|
||
# Strip the DLE marker; the next byte is payload and will be appended
|
||
# in the next loop iteration.
|
||
i += 1
|
||
continue
|
||
out.append(b)
|
||
i += 1
|
||
return bytes(out)
|
||
|
||
|
||
def _frame_body_bytes(frame: S3Frame, skip: int) -> bytes:
|
||
"""
|
||
Extract the waveform body contribution from one A5 S3Frame.
|
||
|
||
The contribution is frame.data[skip:] with inner-frame DLE pairs stripped
|
||
per _strip_inner_frame_dles(). The chk_byte is temporarily appended before
|
||
stripping to handle the split-pair edge case where a DLE at the end of
|
||
frame.data is paired with chk_byte.
|
||
|
||
Split-pair edge case (confirmed for A5[8] of M529LIY6 (example event), 2026-04-21):
|
||
|
||
S3FrameParser appends DLE+XX pairs as two literal bytes when XX ∉ {DLE, ETX}.
|
||
When the LAST occurrence of such a pair straddles the payload/checksum boundary
|
||
(i.e., DLE is the last byte of raw_payload and XX is the checksum), the parser
|
||
splits them:
|
||
- DLE ends up as the last byte of frame.data (frame.data[-1] == 0x10)
|
||
- XX is stored as frame.chk_byte
|
||
|
||
To strip the pair correctly, we reunite the bytes before calling the strip
|
||
function. Since chk_byte is the checksum (not payload data), it is excluded
|
||
from the final output regardless of whether it was part of a pair.
|
||
|
||
Post-strip chk_byte removal (ALL cases):
|
||
_strip_inner_frame_dles strips the 0x10 and KEEPS chk_byte in all cases.
|
||
Chk_byte is always the checksum (not payload), so always strip it off.
|
||
|
||
Args:
|
||
frame: S3Frame with frame.data and frame.chk_byte populated.
|
||
skip: Number of leading bytes in frame.data to exclude (frame header).
|
||
|
||
Returns:
|
||
bytes — the waveform body contribution for this frame.
|
||
"""
|
||
if skip >= len(frame.data):
|
||
return b""
|
||
|
||
relevant = frame.data[skip:]
|
||
|
||
# Detect split DLE+chk pair at the frame boundary.
|
||
has_split_pair = (
|
||
len(relevant) > 0
|
||
and relevant[-1] == 0x10
|
||
and frame.chk_byte in {0x02, 0x03, 0x04}
|
||
)
|
||
|
||
if has_split_pair:
|
||
# Reunite the split pair so the strip function sees both bytes together.
|
||
buf = relevant + bytes([frame.chk_byte])
|
||
stripped = _strip_inner_frame_dles(buf)
|
||
# _strip_inner_frame_dles strips the DLE (0x10) and KEEPS chk_byte.
|
||
# chk_byte is the received checksum — never payload — so remove it.
|
||
# This is correct for all values in {0x02, 0x03, 0x04}.
|
||
if stripped:
|
||
stripped = stripped[:-1]
|
||
return stripped
|
||
else:
|
||
return _strip_inner_frame_dles(relevant)
|
||
|
||
|
||
# ── Filename helper ───────────────────────────────────────────────────────────
|
||
|
||
_INSTANTEL_EPOCH = datetime.datetime(1985, 1, 1, 0, 0, 0)
|
||
"""
|
||
Instantel timestamp epoch — January 1, 1985, 00:00:00 local time.
|
||
Confirmed 2026-04-21: stem values for 6 independent events (April 1–9, 2026)
|
||
all converge to this epoch when decoded as floor(seconds_since_epoch / 1296).
|
||
1985 is the year Instantel was founded.
|
||
"""
|
||
|
||
_STEM_UNIT_SEC = 1296 # = 36^2 seconds ≈ 21.6 minutes per stem unit
|
||
|
||
_STEM_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||
|
||
# ── Waveform file extension encoding ─────────────────────────────────────────
|
||
#
|
||
# NEW FIRMWARE (V10.72+) — FULLY DECODED (confirmed 2026-04-21, 10-year archive):
|
||
#
|
||
# Extension format: AB0T (4 characters)
|
||
# AB = 2-char base-36 encoding of (seconds_since_epoch % 1296)
|
||
# i.e. the number of seconds into the current 21.6-minute stem window
|
||
# Range: 0 ("00") to 1295 ("ZZ")
|
||
# 0 = always literal '0'
|
||
# T = event type: 'W' = Full Waveform, 'H' = Full Histogram
|
||
#
|
||
# Combined with the 4-char stem (which encodes seconds_since_epoch // 1296),
|
||
# the FULL filename gives a second-resolution timestamp:
|
||
# total_seconds = stem_val * 1296 + ab_val
|
||
# timestamp = EPOCH + timedelta(seconds=total_seconds)
|
||
#
|
||
# Verified against three S353L4H0 events (all three match to the second):
|
||
# S353L4H0.3M0W Full Waveform 2025-06-23 13:57:22 AB=3M=130 ✓
|
||
# S353L4H0.8S0H Full Histogram 2025-06-23 14:00:28 AB=8S=316 ✓
|
||
# S353L4H0.9X0W Full Waveform 2025-06-23 14:01:09 AB=9X=357 ✓
|
||
#
|
||
# OLD FIRMWARE (S338, 3-char extensions ending in '0') — UNKNOWN:
|
||
# Observed (old firmware / manual downloads): .440, .470, .7M0, .9T0, .EI0, etc.
|
||
# The V10.72 formula does NOT apply to these.
|
||
# Extension is NOT recording mode (refuted 2026-04-21: continuous → .EI0, not .9T0).
|
||
# blastware_filename() computes the correct AB0 extension for V10.72 firmware.
|
||
#
|
||
# WRONG earlier assumption (do not re-introduce):
|
||
# Extension was believed to encode recording mode × sample rate.
|
||
# Refuted by continuous-mode event producing .EI0 instead of .9T0.
|
||
|
||
|
||
def _make_stem(ts_local: datetime.datetime) -> str:
|
||
"""
|
||
Encode a local timestamp as a 4-character uppercase base-36 stem.
|
||
|
||
Algorithm (confirmed 2026-04-21 from 6 known file/timestamp pairs):
|
||
stem_int = floor((ts_local - Jan_1_1985_midnight_local) / 1296_seconds)
|
||
stem = 4-char uppercase base-36 encoding of stem_int
|
||
|
||
Unit = 36² = 1296 seconds ≈ 21.6 minutes. Events within the same 1296-second
|
||
window receive the same stem; their extension distinguishes them.
|
||
"""
|
||
delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds())
|
||
n = delta_sec // _STEM_UNIT_SEC
|
||
s = ""
|
||
for _ in range(4):
|
||
s = _STEM_CHARS[n % 36] + s
|
||
n //= 36
|
||
return s
|
||
|
||
|
||
def blastware_filename(event: Event, serial: str, ach: bool = False) -> str:
|
||
"""
|
||
Return the correct Blastware filename for an event.
|
||
|
||
CONFIRMED 2026-04-22 — verified against 3,248 files from a 10-year archive.
|
||
|
||
Filename format: <prefix_letter><serial3><stem><AB>0[T]
|
||
where:
|
||
|
||
prefix_letter = chr(ord('B') + floor(serial_numeric / 1000))
|
||
— encodes the production generation (batch of 1000 units)
|
||
— e.g. BE6907→H, BE11529→M, BE14036→P, BE18003→T
|
||
|
||
serial3 = f"{serial_numeric % 1000:03d}"
|
||
— last 3 digits of numeric serial, zero-padded
|
||
|
||
stem = 4-char base-36 of floor(total_seconds / 1296)
|
||
— encodes which 21.6-minute window the event fell in
|
||
|
||
AB = 2-char base-36 of (total_seconds % 1296)
|
||
— encodes seconds within the window (0–1295)
|
||
|
||
0 = always literal digit zero
|
||
|
||
T = 'W' or 'H' — ONLY appended for call-home (ACH) downloads (ach=True).
|
||
Manual / direct downloads produce a 3-char extension (AB0) with no type char.
|
||
Call-home downloads produce a 4-char extension (AB0W or AB0H).
|
||
|
||
total_seconds = (event_local_time − 1985-01-01T00:00:00_local) in seconds
|
||
|
||
The 10-year production archive contains only call-home files (all end in W or H).
|
||
Manual Blastware downloads produce 3-char extensions — the same AB0 prefix but
|
||
without the trailing type character.
|
||
|
||
Micromate Series 4 uses a completely different naming scheme (literal datetime
|
||
in filename); this function does not apply to Micromate units.
|
||
|
||
Args:
|
||
event: Event object with timestamp set.
|
||
serial: Device serial number string (e.g. "BE11529").
|
||
ach: If True, append W/H type character (call-home style).
|
||
If False (default), omit type character (direct download style).
|
||
|
||
Returns:
|
||
Filename string, e.g. "M529LIY6.CE0" (direct) or "M529LIY6.CE0H" (ACH).
|
||
"""
|
||
# ── Serial prefix ──────────────────────────────────────────────────────────
|
||
serial_digits = "".join(c for c in serial if c.isdigit())
|
||
if len(serial_digits) >= 1:
|
||
serial_numeric = int(serial_digits)
|
||
generation = serial_numeric // 1000
|
||
prefix_letter = chr(ord('B') + generation)
|
||
serial3 = f"{serial_numeric % 1000:03d}"
|
||
else:
|
||
prefix_letter = "M" # fallback
|
||
serial3 = "000"
|
||
prefix = prefix_letter + serial3
|
||
|
||
# ── Stem + AB extension from timestamp ────────────────────────────────────
|
||
if event.timestamp is not None:
|
||
try:
|
||
ts_local = datetime.datetime(
|
||
event.timestamp.year, event.timestamp.month, event.timestamp.day,
|
||
event.timestamp.hour, event.timestamp.minute, event.timestamp.second,
|
||
)
|
||
delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds())
|
||
stem = _make_stem(ts_local)
|
||
ab_val = delta_sec % _STEM_UNIT_SEC
|
||
ab_str = _STEM_CHARS[ab_val // 36] + _STEM_CHARS[ab_val % 36]
|
||
except (ValueError, TypeError, AttributeError):
|
||
stem = "0000"
|
||
ab_str = "00"
|
||
else:
|
||
stem = "0000"
|
||
ab_str = "00"
|
||
|
||
# ── Type character (ACH only) ─────────────────────────────────────────────
|
||
if ach:
|
||
if getattr(event, 'recording_mode', None) in (3, 4): # Histogram / Hist+Cont
|
||
type_char = 'H'
|
||
else:
|
||
type_char = 'W'
|
||
ext = f".{ab_str}0{type_char}"
|
||
else:
|
||
ext = f".{ab_str}0"
|
||
|
||
return prefix + stem + ext
|
||
|
||
|
||
# ── A5 frame classifier ───────────────────────────────────────────────────────────
|
||
|
||
# ASCII markers that identify a compliance-config / metadata frame.
|
||
# These strings appear in the A5 bulk stream as part of the device's
|
||
# compliance setup payload. They should NEVER appear in raw ADC waveform
|
||
# frames (which are binary-heavy, < 20 % printable ASCII).
|
||
_METADATA_FRAME_MARKERS = (
|
||
b"Project:",
|
||
b"Client:",
|
||
b"Standard Recording Setup",
|
||
b"Extended Notes",
|
||
b"User Name:",
|
||
b"Seis Loc:",
|
||
)
|
||
|
||
|
||
def classify_frame(frame: S3Frame) -> str:
|
||
"""
|
||
Classify an A5 bulk waveform stream frame by its content.
|
||
|
||
Returns one of:
|
||
"terminator" — page_key == 0x0000
|
||
"probe_or_strt" — data contains b"STRT\xff\xfe" (the initial probe response)
|
||
"metadata" — data contains ASCII compliance-config markers
|
||
"waveform" — predominantly binary (< 20 % printable ASCII)
|
||
"unknown" — none of the above criteria matched
|
||
|
||
Used by write_blastware_file() to filter non-waveform frames out of
|
||
the reconstructed body so that metadata blocks (Project:, Client:, …)
|
||
and spurious STRT records do not corrupt the output file.
|
||
"""
|
||
if frame.page_key == 0x0000:
|
||
return "terminator"
|
||
data = bytes(frame.data)
|
||
if b"STRT\xff\xfe" in data:
|
||
return "probe_or_strt"
|
||
if any(m in data for m in _METADATA_FRAME_MARKERS):
|
||
return "metadata"
|
||
if len(data) > 0:
|
||
printable = sum(1 for b in data if 32 <= b < 127)
|
||
if printable / len(data) < 0.20:
|
||
return "waveform"
|
||
return "unknown"
|
||
|
||
|
||
# ── Waveform file writer ───────────────────────────────────────────────────────────
|
||
|
||
def write_blastware_file(
|
||
event: Event,
|
||
a5_frames: list[S3Frame],
|
||
path: Union[str, Path],
|
||
) -> None:
|
||
"""
|
||
Write a Blastware waveform file from a downloaded event.
|
||
|
||
Args:
|
||
event: Event object (populated by get_events() or download_waveform()).
|
||
Used for the STRT record (key, rectime) and footer timestamps.
|
||
a5_frames: Complete A5 frame list INCLUDING the terminator frame
|
||
(page_key=0x0000). Pass include_terminator=True to
|
||
read_bulk_waveform_stream() when collecting frames.
|
||
Must have at least 2 frames (probe + terminator).
|
||
path: Destination file path. Parent directory must exist.
|
||
Extension should be set via blastware_filename().
|
||
|
||
File layout:
|
||
[22B header] [21B STRT] [body bytes] [26B footer]
|
||
|
||
Raises:
|
||
ValueError: if a5_frames is empty or has no terminator (page_key=0).
|
||
OSError: if the file cannot be written.
|
||
|
||
Confirmed correct waveform body reconstruction against M529LIY6 (example event) (2026-04-21).
|
||
"""
|
||
if not a5_frames:
|
||
raise ValueError("a5_frames must not be empty")
|
||
|
||
path = Path(path)
|
||
|
||
# ── Extract STRT record from probe frame ────────────────────────────────
|
||
# The STRT record (21 bytes) lives verbatim inside A5[0].data[7:].
|
||
# It is stored as-is in the waveform file — do NOT reconstruct it from Event
|
||
# fields, as bytes [10:14] and [14:20] contain device-specific values
|
||
# (not simply key4 repeated or zero-padded). Confirmed 2026-04-21.
|
||
#
|
||
# STRT layout (21 bytes, observed in M529LIY6 files):
|
||
# [0:4] b'STRT'
|
||
# [4:6] 0xff 0xfe (fixed)
|
||
# [6:10] key4 (event key)
|
||
# [10:14] device-specific field (NOT a key4 repeat)
|
||
# [14:20] device-specific fields (NOT zeros)
|
||
# [20] rectime uint8 seconds
|
||
# Extract STRT from the DLE-stripped probe frame.
|
||
#
|
||
# frame.data[7:] is the raw wire representation; it may contain DLE+{02,03,04}
|
||
# inner-frame pairs that S3FrameParser preserves as two literal bytes. The
|
||
# Blastware file stores the stripped form, so we must strip before extracting.
|
||
#
|
||
# Example (M529LK0Y, 2026-04-21): STRT contains value 0x02 encoded as [10 02]
|
||
# on the wire. Without stripping, STRT is 22 raw bytes → write_blastware_file writes the
|
||
# DLE prefix into the file AND begins the body 1 byte too early (probe_skip off
|
||
# by 1). Stripping fixes both.
|
||
#
|
||
# probe_skip must be computed in the RAW frame.data domain (it is used as the
|
||
# `skip` argument to _frame_body_bytes which operates on raw frame.data).
|
||
# We walk the raw bytes counting stripped bytes until we have passed
|
||
# strt_pos + 21 stripped bytes, giving the raw offset of the first body byte.
|
||
w0_raw = bytes(a5_frames[0].data[7:])
|
||
w0_stripped = _strip_inner_frame_dles(w0_raw)
|
||
strt_pos_stripped = w0_stripped.find(b"STRT")
|
||
|
||
if strt_pos_stripped >= 0:
|
||
strt = bytes(w0_stripped[strt_pos_stripped : strt_pos_stripped + 21])
|
||
|
||
# Walk raw bytes to find the raw-domain end of the STRT (= body start).
|
||
target_stripped = strt_pos_stripped + 21
|
||
stripped_so_far = 0
|
||
raw_i = 0
|
||
while stripped_so_far < target_stripped and raw_i < len(w0_raw):
|
||
if (w0_raw[raw_i] == 0x10
|
||
and raw_i + 1 < len(w0_raw)
|
||
and w0_raw[raw_i + 1] in {0x02, 0x03, 0x04}):
|
||
raw_i += 2 # DLE pair → 1 stripped byte, 2 raw bytes
|
||
else:
|
||
raw_i += 1 # normal byte → 1 stripped byte, 1 raw byte
|
||
stripped_so_far += 1
|
||
probe_skip = 7 + raw_i # raw bytes to skip: 7 header + raw STRT length
|
||
else:
|
||
# Fallback: construct a minimal STRT if probe frame lacks it
|
||
key4 = event._waveform_key if hasattr(event, '_waveform_key') and event._waveform_key else bytes(4)
|
||
rectime = event.rectime_seconds if event.rectime_seconds is not None else 0
|
||
strt = b"STRT" + b"\xff\xfe" + key4 + bytes(14) + bytes([rectime & 0xFF])
|
||
probe_skip = 7 + 21
|
||
|
||
log.debug(
|
||
"write_blastware_file: strt_pos_stripped=%d probe_skip=%d "
|
||
"probe_data_len=%d strt_hex=%s",
|
||
strt_pos_stripped if strt_pos_stripped >= 0 else -1,
|
||
probe_skip,
|
||
len(a5_frames[0].data),
|
||
strt.hex() if len(strt) >= 4 else "(short)",
|
||
)
|
||
|
||
if len(strt) != 21:
|
||
raise ValueError(f"STRT record must be 21 bytes, got {len(strt)}")
|
||
|
||
# ── Build waveform file header ─────────────────────────────────────────────────────
|
||
header = _FILE_HEADER_PREFIX + _WAVEFORM_TYPE_TAG
|
||
assert len(header) == _WAVEFORM_HEADER_SIZE, f"Waveform header must be {_WAVEFORM_HEADER_SIZE} bytes"
|
||
|
||
# ── Build body from A5 frames ────────────────────────────────────────────
|
||
# The waveform body is reconstructed from ALL A5 frames (data + terminator).
|
||
# The terminator frame's contribution includes the 26-byte footer at its end.
|
||
#
|
||
# Reconstruction layout (confirmed from M529LIY6 captures, 2026-04-21):
|
||
# all_bytes = contributions from A5[0..N] + terminator_contribution
|
||
# body = all_bytes[:-26] (everything except the last 26 bytes)
|
||
# footer = all_bytes[-26:] (last 26 bytes = the waveform file footer)
|
||
#
|
||
# The footer bytes come directly from the terminator frame's inner content —
|
||
# using them verbatim ensures timestamps match the device's recorded values.
|
||
|
||
# Separate terminator from data frames.
|
||
# Search from the FRONT for the first terminator (page_key == 0x0000).
|
||
# Do NOT use a5_frames[-1] — if _a5_frames contains stray frames from a
|
||
# subsequent event (a known get_events side-effect), the last frame will
|
||
# not be the terminator and the footer will be mis-identified.
|
||
# TERM detection (v0.14.0): last frame if page_key != 0x0010 (sample marker)
|
||
term_idx: Optional[int] = None
|
||
if a5_frames and a5_frames[-1].page_key != 0x0010:
|
||
term_idx = len(a5_frames) - 1
|
||
|
||
if term_idx is not None:
|
||
body_frames = a5_frames[:term_idx]
|
||
term_frame = a5_frames[term_idx]
|
||
else:
|
||
body_frames = a5_frames
|
||
term_frame = None
|
||
|
||
# Frame contribution loop (v0.14.0 BW-exact walk).
|
||
# Skip values:
|
||
# probe (fi=0): probe_skip
|
||
# meta@0x1002 (fi=1): 13 (6-byte inner header)
|
||
# meta@0x1004 (fi=2): 13 (6-byte inner header)
|
||
# sample chunks (fi=3+): 12 (5-byte inner header)
|
||
last_fi = len(body_frames) - 1
|
||
|
||
log.debug(
|
||
"write_blastware_file: %d body_frames last_fi=%d",
|
||
len(body_frames), last_fi,
|
||
)
|
||
|
||
all_bytes = bytearray()
|
||
|
||
for fi, frame in enumerate(body_frames):
|
||
if fi == 0:
|
||
skip = probe_skip
|
||
elif fi in (1, 2):
|
||
skip = 13 # metadata pages
|
||
else:
|
||
skip = 12 # sample chunks
|
||
|
||
contribution = _frame_body_bytes(frame, skip)
|
||
log.debug("write_blastware_file: fi=%d skip=%d raw_data=%d contribution=%d",
|
||
fi, skip, len(frame.data), len(contribution))
|
||
all_bytes.extend(contribution)
|
||
|
||
# Terminator contributes its content, which ends with the 26-byte footer.
|
||
# skip=11 (not 12) because the terminator's inner frame header is 4 bytes,
|
||
# one shorter than chunk frames' 5-byte inner header. Confirmed 2026-04-21.
|
||
if term_frame is not None:
|
||
term_contribution = _frame_body_bytes(term_frame, 11)
|
||
log.debug(
|
||
"write_blastware_file: term_frame data_len=%d skip=11 "
|
||
"contribution_len=%d first8=%s",
|
||
len(term_frame.data),
|
||
len(term_contribution),
|
||
term_contribution[:8].hex() if len(term_contribution) >= 8 else term_contribution.hex(),
|
||
)
|
||
all_bytes.extend(term_contribution)
|
||
|
||
log.debug(
|
||
"write_blastware_file: all_bytes total=%d last28=%s",
|
||
len(all_bytes),
|
||
bytes(all_bytes[-28:]).hex() if len(all_bytes) >= 28 else bytes(all_bytes).hex(),
|
||
)
|
||
|
||
# NOTE: The "duplicate header+STRT strip" logic from v0.13.x has been
|
||
# REMOVED in v0.14.2. Under the v0.14.0 BW-exact 5A walk, body assembly
|
||
# is just contiguous concatenation of frame contributions in stream order
|
||
# (probe → meta@0x1002 → meta@0x1004 → samples → TERM), exactly as BW
|
||
# writes its files. The previous strip was matching the `00 12 03 00 STRT`
|
||
# byte sequence in legitimate waveform data — sample chunks at counter
|
||
# 0x1000 and beyond often contain those bytes coincidentally — and
|
||
# zeroing 25 bytes of valid samples per match. Compared to a known-good
|
||
# BW reference for the same 3-sec event 0, the strip introduced 26 bytes
|
||
# of zeros that BW did not have, then propagated alignment differences
|
||
# through the rest of the body. See decode_test/5-1-26/bw vs SFM diff
|
||
# at file[0x1012..0x102B] (2026-05-04 analysis).
|
||
|
||
# Find the first valid 0e 08 footer marker (v0.14.0).
|
||
footer_pos = -1
|
||
pos = 0
|
||
while True:
|
||
pos = bytes(all_bytes).find(b"\x0e\x08", pos)
|
||
if pos < 0 or pos + 26 > len(all_bytes):
|
||
break
|
||
yr = (all_bytes[pos + 4] << 8) | all_bytes[pos + 5]
|
||
if 2015 <= yr <= 2050:
|
||
footer_pos = pos
|
||
break
|
||
pos += 1
|
||
if footer_pos >= 0:
|
||
body = bytes(all_bytes[:footer_pos])
|
||
footer = bytes(all_bytes[footer_pos:footer_pos + 26])
|
||
log.debug(
|
||
"write_blastware_file: real 0e 08 footer at all_bytes[%d]; "
|
||
"truncating %d post-footer bytes",
|
||
footer_pos, len(all_bytes) - footer_pos - 26,
|
||
)
|
||
elif len(all_bytes) >= 26:
|
||
body = bytes(all_bytes[:-26])
|
||
footer = bytes(all_bytes[-26:])
|
||
else:
|
||
body = bytes(all_bytes)
|
||
start_dt = _ts_from_model(event.timestamp)
|
||
stop_dt: Optional[datetime.datetime] = None
|
||
if start_dt is not None and event.rectime_seconds:
|
||
stop_dt = start_dt + datetime.timedelta(seconds=event.rectime_seconds)
|
||
footer = (
|
||
b"\x0e\x08"
|
||
+ _encode_ts_be(start_dt)
|
||
+ _encode_ts_be(stop_dt)
|
||
+ b"\x00\x01\x00\x02\x00\x00"
|
||
+ b"\x00\x00"
|
||
)
|
||
|
||
# ── Write file ───────────────────────────────────────────────────────────
|
||
with open(path, "wb") as f:
|
||
f.write(header)
|
||
f.write(strt)
|
||
f.write(body)
|
||
f.write(footer)
|
||
|
||
|
||
def read_blastware_file(path: Union[str, Path]) -> Event:
|
||
"""
|
||
Parse a Blastware waveform file into an Event object.
|
||
|
||
NOT YET IMPLEMENTED.
|
||
|
||
Args:
|
||
path: Path to the waveform file.
|
||
|
||
Returns:
|
||
Event object with waveform data populated.
|
||
|
||
Raises:
|
||
NotImplementedError: always (pending implementation).
|
||
"""
|
||
raise NotImplementedError("read_blastware_file() is not yet implemented")
|
||
|
||
|
||
# ── MLG file writer ───────────────────────────────────────────────────────────
|
||
|
||
def _build_mlg_header(serial: str) -> bytes:
|
||
"""
|
||
Build the 308-byte MLG file header.
|
||
|
||
Header structure (confirmed from BE11529.MLG binary inspection):
|
||
Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 (22B)
|
||
Offset 0x16: ... (16B unknown — observed as zeros in BE11529.MLG)
|
||
Offset 0x2A: serial number (8 bytes, null-padded ASCII)
|
||
... rest zero-padded to 308 bytes
|
||
|
||
The serial string "BE11529" appears at offset 0x2A (42 decimal).
|
||
"""
|
||
buf = bytearray(_MLG_HEADER_SIZE)
|
||
|
||
# Common prefix + MLG type tag
|
||
prefix = _FILE_HEADER_PREFIX + _MLG_TYPE_TAG # 22 bytes
|
||
buf[0:len(prefix)] = prefix
|
||
|
||
# Serial number at offset 0x2A
|
||
serial_bytes = serial.encode("ascii", errors="replace")[:8]
|
||
serial_padded = serial_bytes.ljust(8, b"\x00")
|
||
buf[0x2A : 0x2A + 8] = serial_padded
|
||
|
||
return bytes(buf)
|
||
|
||
|
||
def _build_mlg_record(
|
||
entry: MonitorLogEntry,
|
||
serial: str,
|
||
) -> bytes:
|
||
"""
|
||
Build one 292-byte MLG record from a MonitorLogEntry.
|
||
|
||
Record layout (confirmed from BE11529.MLG binary inspection):
|
||
[0:2] CRC — 2-byte CRC (algorithm unknown; written as 0x0000)
|
||
[2:6] marker — 22 01 0e 80
|
||
[6:14] ts1 — 8B big-endian start timestamp
|
||
[14:22] ts2 — 8B big-endian stop timestamp
|
||
[22:26] flags — 4B record flags (see MLG_FLAGS_* constants)
|
||
[26:36] serial — 10B null-padded serial number
|
||
[36:] text — for triggered events: [0x08][8B ts1_copy]["Geo: X.XXX in/s"]
|
||
for monitoring intervals: b"" or minimal separator
|
||
[... zero-padded to 292 bytes]
|
||
|
||
Flags based on entry type:
|
||
- MonitorLogEntry with start_time only (no stop_time): MLG_FLAGS_START_ONLY
|
||
- MonitorLogEntry with both times and geo_threshold_ips set: MLG_FLAGS_TRIGGER
|
||
- MonitorLogEntry with both times (monitoring interval): MLG_FLAGS_INTERVAL
|
||
|
||
The triggered-event text block (flags = MLG_FLAGS_TRIGGER):
|
||
[0x08] [ts1: 8B] [ASCII "Geo: X.XXX in/s\x00"]
|
||
Confirmed from BE11529.MLG records at offset 0x0134 and 0x0258.
|
||
"""
|
||
buf = bytearray(_MLG_RECORD_SIZE)
|
||
|
||
start_dt = (
|
||
datetime.datetime(
|
||
entry.start_time.year, entry.start_time.month, entry.start_time.day,
|
||
entry.start_time.hour, entry.start_time.minute, entry.start_time.second,
|
||
)
|
||
if entry.start_time else None
|
||
)
|
||
stop_dt = (
|
||
datetime.datetime(
|
||
entry.stop_time.year, entry.stop_time.month, entry.stop_time.day,
|
||
entry.stop_time.hour, entry.stop_time.minute, entry.stop_time.second,
|
||
)
|
||
if entry.stop_time else None
|
||
)
|
||
|
||
# [0:2] CRC placeholder
|
||
buf[0:2] = b"\x00\x00"
|
||
|
||
# [2:6] Record marker
|
||
buf[2:6] = _MLG_RECORD_MARKER
|
||
|
||
# [6:14] ts1
|
||
buf[6:14] = _encode_ts_be(start_dt)
|
||
|
||
# [14:22] ts2
|
||
buf[14:22] = _encode_ts_be(stop_dt)
|
||
|
||
# [22:26] flags
|
||
if stop_dt is None:
|
||
flags = MLG_FLAGS_START_ONLY
|
||
elif entry.geo_threshold_ips is not None:
|
||
flags = MLG_FLAGS_TRIGGER
|
||
else:
|
||
flags = MLG_FLAGS_INTERVAL
|
||
buf[22:26] = flags
|
||
|
||
# [26:36] serial (10B null-padded)
|
||
serial_bytes = serial.encode("ascii", errors="replace")[:10]
|
||
buf[26 : 26 + len(serial_bytes)] = serial_bytes
|
||
|
||
# [36:] text content
|
||
pos = 36
|
||
if flags == MLG_FLAGS_TRIGGER:
|
||
# Extra ts1 copy: [0x08][ts1: 8B]
|
||
buf[pos] = 0x08
|
||
pos += 1
|
||
buf[pos : pos + 8] = _encode_ts_be(start_dt)
|
||
pos += 8
|
||
|
||
if entry.geo_threshold_ips is not None:
|
||
geo_text = f"Geo: {entry.geo_threshold_ips:.3f} in/s\x00".encode("ascii")
|
||
buf[pos : pos + len(geo_text)] = geo_text
|
||
pos += len(geo_text)
|
||
|
||
return bytes(buf)
|
||
|
||
|
||
def write_mlg(
|
||
entries: list[MonitorLogEntry],
|
||
serial: str,
|
||
path: Union[str, Path],
|
||
) -> None:
|
||
"""
|
||
Write a Blastware .MLG monitor log file.
|
||
|
||
Args:
|
||
entries: List of MonitorLogEntry objects (from get_monitor_log_entries()).
|
||
Each entry produces one 292-byte record in the file.
|
||
serial: Device serial number string (e.g. "BE11529").
|
||
Written to the file header and each record.
|
||
path: Destination file path. Extension is not enforced — use ".MLG".
|
||
|
||
File layout:
|
||
[308B header] [N × 292B records]
|
||
|
||
Note: The 2-byte CRC at the start of each record is written as 0x0000.
|
||
The CRC algorithm is unknown (see module docstring).
|
||
|
||
Raises:
|
||
OSError: if the file cannot be written.
|
||
"""
|
||
path = Path(path)
|
||
header = _build_mlg_header(serial)
|
||
|
||
with open(path, "wb") as f:
|
||
f.write(header)
|
||
for entry in entries:
|
||
record = _build_mlg_record(entry, serial)
|
||
f.write(record)
|
||
|
||
|
||
def read_mlg(path: Union[str, Path]) -> list[MonitorLogEntry]:
|
||
"""
|
||
Parse a Blastware .MLG file into a list of MonitorLogEntry objects.
|
||
|
||
NOT YET IMPLEMENTED.
|
||
|
||
Args:
|
||
path: Path to the .MLG file.
|
||
|
||
Returns:
|
||
List of MonitorLogEntry objects.
|
||
|
||
Raises:
|
||
NotImplementedError: always (pending implementation).
|
||
"""
|
||
raise NotImplementedError("read_mlg() is not yet implemented")
|