""" blastware_file.py — Blastware binary file codec for bidirectional interoperability. Reads and writes the proprietary Instantel/Blastware file formats: Waveform events (.CE0W, .VM0H, .440, .7M0, etc.) (extension encoding UNKNOWN — see below) .MLG — Monitor log (monitoring session history) All waveform formats share a common 22-byte file header prefix and identical internal binary structure (same type tag 00 12 03 00, same STRT record layout). Blastware identifies the file type by extension, not by a magic marker. EXTENSION ENCODING — V10.72 firmware FULLY CONFIRMED 2026-04-22: Direct / manual download: AB0 (3-char, no type character) Call-home (ACH) download: AB0W or AB0H (4-char, W=waveform H=histogram) AB = 2-char base-36 of (total_seconds % 1296), where total_seconds = (event_local_time − 1985-01-01T00:00:00_local). 0 = always literal digit zero. Verified against 3,248 call-home files from a 10-year production archive. The 10-year archive contains only ACH files (all end in W or H). Manual Blastware downloads produce 3-char AB0 extensions — same encoding but without the trailing type character. Old firmware (S338, 3-char extensions): encoding unknown / same as manual? Micromate Series 4 uses a different scheme (literal datetime in filename). ─── File structure overview ───────────────────────────────────────────────────── Waveform file structure (confirmed from example-events/4-3-26-multi/M529LIY6 (example event)): [22B header] [21B STRT record] [body bytes] [26B footer] Header (22 bytes): 10 00 01 80 00 00 — fixed prefix 49 6e 73 74 61 6e 74 65 6c 00 — b'Instantel\x00' 07 2c — fixed 00 12 03 00 — waveform file type tag (shared by all waveform extensions) STRT record (21 bytes, immediately follows header): 53 54 52 54 — b'STRT' ff fe — fixed (2 bytes) [key4] — 4-byte waveform event key [key4] — 4-byte waveform event key (repeated) [zeros] — 7 bytes padding [rectime] — uint8 record time in seconds Body (variable — reconstructed from A5 frame data): The body bytes are derived from the raw A5 frame wire content, specifically from the DLE-decoded representation of each frame's contribution. See the _frame_body_bytes() helper for the exact algorithm. Footer (26 bytes): 0e 08 [ts1: 8B big-endian timestamp] — start timestamp [ts2: 8B big-endian timestamp] — stop timestamp 00 01 00 02 00 00 [crc: 2B] — CRC (algorithm unconfirmed; written as 0x00 0x00 placeholder) Timestamp format (big-endian, 8 bytes): [day] [month] [year_HI] [year_LO] [0x00] [hour] [min] [sec] MLG (monitor log, confirmed from example-events/4-3-26-multi/BE11529.MLG): [308B header] [N × 292B records] Header (308 bytes): Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 — fixed (16B) Offset 0x10: ... (unknown structure, written as zeros + serial) Offset 0x2A: serial number (8 bytes, null-padded ASCII, e.g. "BE11529") ... zero-padded to 308 bytes total Record (292 bytes each): [2B CRC] — unknown algorithm; written as 0x00 0x00 22 01 0e 80 — record marker [ts1: 8B big-endian timestamp] — start time [ts2: 8B big-endian timestamp] — stop time (zeros if no stop) [4B flags] — see MLG_FLAGS_* constants below [10B serial] — null-padded serial number ASCII [text] — for trigger records: [0x08][8B ts1_copy] then ASCII "Geo: X.XXX in/s" for monitoring records: b'' (or minimal separator) [zero-padded to 292 bytes] ─── Critical implementation notes ────────────────────────────────────────────── Waveform body reconstruction algorithm (confirmed 2026-04-21 from verification against M529LIY6 (example event) using raw_s3_20260403_153508.bin capture): The waveform body bytes come from the A5 frame content, stripped of DLE-framing artifacts. Each A5 frame contributes a different slice of its data section, with DLE+{0x02,0x03,0x04} byte pairs stripped. Skip amounts per frame index (offsets into frame.data): A5[0] (probe): data[strt_pos + 21 + 7] (skip header + STRT record) strt_pos found by searching frame.data[7:] for b'STRT'; the contribution starts at strt_pos + 21 within data[7:] which equals strt_pos + 21 + 7 within frame.data. A5[1]: data[13] (skip 7-byte frame.data prefix + 6 header bytes) A5[2..N]: data[12] (skip 7-byte frame.data prefix + 5 header bytes) Terminator A5: data[11] (1 byte less than chunk frames; terminator inner header is 4 bytes instead of 5 — confirmed 2026-04-21) DLE strip rule (applied AFTER slicing): Strip any 0x10 byte that is immediately followed by 0x02, 0x03, or 0x04. This undoes the DLE-escape that S3FrameParser preserves as literal pairs. Applied to frame.data[skip:] + bytes([frame.chk_byte]) together, then conditionally exclude the trailing chk_byte from the output. chk_byte absorption: When frame.data[-1] == 0x10 AND frame.chk_byte ∈ {0x02, 0x03, 0x04}, the last byte of frame.data is the DLE prefix of a split DLE+chk pair. Including chk_byte in the strip buffer allows the pair to be stripped as a unit. After stripping, the trailing chk_byte is ALWAYS removed — because _strip_inner_frame_dles keeps the byte after the DLE (the chk_byte value), and that value is the checksum, never payload. This applies to all three cases (chk ∈ {0x02, 0x03, 0x04}) identically. MLG CRC: The algorithm that produces the 2-byte CRC at the start of each MLG record is unknown. All examined records use non-zero values that do not match CRC-16/CCITT, CRC-16/IBM, CRC-32 (truncated), word sums, XOR variants, or any of the 40+ polynomial/init combinations tested. The writer emits 0x0000. This produces files that Blastware may reject or display without the CRC check — the exact impact on BW import is unknown (TODO: test). ─── Public API ────────────────────────────────────────────────────────────────── blastware_filename(event, serial) Return the correct Blastware filename for an event (e.g. "M529LIY6.CE0W"). Full AB0T extension encoding confirmed 2026-04-22 against 3,248 archive files. Extension matches what Blastware itself would generate for the same event. write_blastware_file(event, a5_frames, path) Create a Blastware waveform file from an Event and the full A5 frame list. All waveform extensions share the same binary format — the extension is set by blastware_filename() based on the event timestamp and type. read_blastware_file(path) → Event Parse a Blastware waveform file into an Event object with waveform data populated. (Not yet implemented — placeholder raises NotImplementedError.) write_mlg(entries, serial, path) Create a .MLG file from a list of MonitorLogEntry objects. read_mlg(path) → list[MonitorLogEntry] Parse a .MLG file into MonitorLogEntry objects. (Not yet implemented — placeholder raises NotImplementedError.) """ from __future__ import annotations import datetime import struct from pathlib import Path from typing import Optional, Union from .framing import S3Frame from .models import Event, MonitorLogEntry, Timestamp # ── File header constants ───────────────────────────────────────────────────── # Common 16-byte prefix shared by waveform files and MLG (confirmed from binary inspection). _FILE_HEADER_PREFIX = bytes.fromhex("1000018000004973") + b"tantel\x00\x07\x2c" # = 10 00 01 80 00 00 49 73 74 61 6e 74 65 6c 00 07 2c (17 bytes) # Confirmed breakdown: 10 00 01 80 00 00 = fixed; "Instantel\x00" = 10B; 07 2c = fixed # Simpler construction: _FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 17 bytes # Waveform file type tag (4 bytes after common prefix) — shared by ALL waveform extensions _WAVEFORM_TYPE_TAG = b"\x00\x12\x03\x00" # confirmed from M529LIY6 (example event) — same tag for .CE0W, .VM0H, etc. # MLG type tag (4 bytes after common prefix) _MLG_TYPE_TAG = b"\x22\x01\x0e\xa0" # confirmed from BE11529.MLG offset 0x11..0x14 # Total header sizes _WAVEFORM_HEADER_SIZE = 22 # 17 + 4 = 21... wait. Let me recalculate. # From binary: first 22 bytes = header, then STRT at byte 22. # 17-byte common prefix + 4-byte type tag = 21 bytes. But observed header is 22B. # Checking: 6 fixed + 10 "Instantel\x00" + 2 "07 2c" = 18B prefix, then 4B type tag = 22B. # Re-count: b"\x10\x00\x01\x80\x00\x00" = 6B + b"Instantel\x00" = 10B + b"\x07\x2c" = 2B = 18B prefix. _FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 18 bytes _WAVEFORM_HEADER_SIZE = 22 # 18 + 4 = 22 bytes ✅ _MLG_HEADER_SIZE = 308 # confirmed from BE11529.MLG # MLG record marker (4 bytes after 2-byte CRC at start of each record) _MLG_RECORD_MARKER = b"\x22\x01\x0e\x80" _MLG_RECORD_SIZE = 292 # bytes per record (confirmed from BE11529.MLG) # MLG record flags (4 bytes at record[22:26]) # Confirmed from BE11529.MLG binary inspection: MLG_FLAGS_START_ONLY = b"\xff\xff\x00\x00" # monitoring start with no stop MLG_FLAGS_TRIGGER = b"\x01\x00\x02\x00" # triggered event (has ts1 + ts2) MLG_FLAGS_INTERVAL = b"\x02\x00\x00\x00" # monitoring interval (has ts1 + ts2) # ── Timestamp helpers ───────────────────────────────────────────────────────── def _encode_ts_be(ts: Optional[datetime.datetime]) -> bytes: """ Encode a datetime as an 8-byte big-endian Blastware timestamp. Format (waveform file and MLG record timestamps): [day][month][year_HI][year_LO][0x00][hour][min][sec] Big-endian year confirmed from M529LIY6 (example event) footer: footer bytes [2..9] = 01 04 07 ea 00 00 1c 08 → day=1 month=4 year=0x07ea=2026 hour=0 min=28 sec=8 ✅ Returns 8 zero bytes if ts is None. """ if ts is None: return bytes(8) return bytes([ ts.day, ts.month, (ts.year >> 8) & 0xFF, ts.year & 0xFF, 0x00, ts.hour, ts.minute, ts.second, ]) def _decode_ts_be(raw: bytes) -> Optional[datetime.datetime]: """ Decode an 8-byte big-endian Blastware timestamp. Returns None if the bytes are all zero or structurally invalid. """ if len(raw) < 8 or raw == bytes(8): return None day = raw[0] month = raw[1] year = (raw[2] << 8) | raw[3] hour = raw[5] minute = raw[6] sec = raw[7] try: return datetime.datetime(year, month, day, hour, minute, sec) except ValueError: return None def _ts_from_model(ts: Optional[Timestamp]) -> Optional[datetime.datetime]: """Convert a models.Timestamp to datetime.datetime, or None.""" if ts is None: return None try: return datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second) except (ValueError, TypeError): return None # ── DLE strip helper ────────────────────────────────────────────────────────── def _strip_inner_frame_dles(data: bytes) -> bytes: """ Strip DLE (0x10) framing markers from A5 inner-frame content. The A5 (bulk waveform stream) response body contains DLE-encoded sub-frame structure. S3FrameParser preserves DLE+XX pairs as two literal bytes in frame.data. Only the DLE marker byte needs to be removed; the following byte is actual payload content. Rule: when 0x10 is immediately followed by {0x02, 0x03, 0x04}, strip the 0x10 (DLE marker) and keep the following byte as payload. Lone 0x10 bytes not followed by {0x02, 0x03, 0x04} are kept as-is. Confirmed correct by verifying reconstructed waveform body against M529LIY6 (example event): - 0x10 0x02 in terminator → 0x02 kept ✓ - 0x10 0x04 in terminator (month byte) → 0x04 kept ✓ """ out = bytearray() i = 0 while i < len(data): b = data[i] if b == 0x10 and i + 1 < len(data) and data[i + 1] in {0x02, 0x03, 0x04}: # Strip the DLE marker; the next byte is payload and will be appended # in the next loop iteration. i += 1 continue out.append(b) i += 1 return bytes(out) def _frame_body_bytes(frame: S3Frame, skip: int) -> bytes: """ Extract the waveform body contribution from one A5 S3Frame. The contribution is frame.data[skip:] with inner-frame DLE pairs stripped per _strip_inner_frame_dles(). The chk_byte is temporarily appended before stripping to handle the split-pair edge case where a DLE at the end of frame.data is paired with chk_byte. Split-pair edge case (confirmed for A5[8] of M529LIY6 (example event), 2026-04-21): S3FrameParser appends DLE+XX pairs as two literal bytes when XX ∉ {DLE, ETX}. When the LAST occurrence of such a pair straddles the payload/checksum boundary (i.e., DLE is the last byte of raw_payload and XX is the checksum), the parser splits them: - DLE ends up as the last byte of frame.data (frame.data[-1] == 0x10) - XX is stored as frame.chk_byte To strip the pair correctly, we reunite the bytes before calling the strip function. Since chk_byte is the checksum (not payload data), it is excluded from the final output regardless of whether it was part of a pair. Post-strip chk_byte removal (ALL cases): _strip_inner_frame_dles strips the 0x10 and KEEPS chk_byte in all cases. Chk_byte is always the checksum (not payload), so always strip it off. Args: frame: S3Frame with frame.data and frame.chk_byte populated. skip: Number of leading bytes in frame.data to exclude (frame header). Returns: bytes — the waveform body contribution for this frame. """ if skip >= len(frame.data): return b"" relevant = frame.data[skip:] # Detect split DLE+chk pair at the frame boundary. has_split_pair = ( len(relevant) > 0 and relevant[-1] == 0x10 and frame.chk_byte in {0x02, 0x03, 0x04} ) if has_split_pair: # Reunite the split pair so the strip function sees both bytes together. buf = relevant + bytes([frame.chk_byte]) stripped = _strip_inner_frame_dles(buf) # _strip_inner_frame_dles strips the DLE (0x10) and KEEPS chk_byte. # chk_byte is the received checksum — never payload — so remove it. # This is correct for all values in {0x02, 0x03, 0x04}. if stripped: stripped = stripped[:-1] return stripped else: return _strip_inner_frame_dles(relevant) # ── Filename helper ─────────────────────────────────────────────────────────── _INSTANTEL_EPOCH = datetime.datetime(1985, 1, 1, 0, 0, 0) """ Instantel timestamp epoch — January 1, 1985, 00:00:00 local time. Confirmed 2026-04-21: stem values for 6 independent events (April 1–9, 2026) all converge to this epoch when decoded as floor(seconds_since_epoch / 1296). 1985 is the year Instantel was founded. """ _STEM_UNIT_SEC = 1296 # = 36^2 seconds ≈ 21.6 minutes per stem unit _STEM_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" # ── Waveform file extension encoding ───────────────────────────────────────── # # NEW FIRMWARE (V10.72+) — FULLY DECODED (confirmed 2026-04-21, 10-year archive): # # Extension format: AB0T (4 characters) # AB = 2-char base-36 encoding of (seconds_since_epoch % 1296) # i.e. the number of seconds into the current 21.6-minute stem window # Range: 0 ("00") to 1295 ("ZZ") # 0 = always literal '0' # T = event type: 'W' = Full Waveform, 'H' = Full Histogram # # Combined with the 4-char stem (which encodes seconds_since_epoch // 1296), # the FULL filename gives a second-resolution timestamp: # total_seconds = stem_val * 1296 + ab_val # timestamp = EPOCH + timedelta(seconds=total_seconds) # # Verified against three S353L4H0 events (all three match to the second): # S353L4H0.3M0W Full Waveform 2025-06-23 13:57:22 AB=3M=130 ✓ # S353L4H0.8S0H Full Histogram 2025-06-23 14:00:28 AB=8S=316 ✓ # S353L4H0.9X0W Full Waveform 2025-06-23 14:01:09 AB=9X=357 ✓ # # OLD FIRMWARE (S338, 3-char extensions ending in '0') — UNKNOWN: # Observed (old firmware / manual downloads): .440, .470, .7M0, .9T0, .EI0, etc. # The V10.72 formula does NOT apply to these. # Extension is NOT recording mode (refuted 2026-04-21: continuous → .EI0, not .9T0). # blastware_filename() computes the correct AB0 extension for V10.72 firmware. # # WRONG earlier assumption (do not re-introduce): # Extension was believed to encode recording mode × sample rate. # Refuted by continuous-mode event producing .EI0 instead of .9T0. def _make_stem(ts_local: datetime.datetime) -> str: """ Encode a local timestamp as a 4-character uppercase base-36 stem. Algorithm (confirmed 2026-04-21 from 6 known file/timestamp pairs): stem_int = floor((ts_local - Jan_1_1985_midnight_local) / 1296_seconds) stem = 4-char uppercase base-36 encoding of stem_int Unit = 36² = 1296 seconds ≈ 21.6 minutes. Events within the same 1296-second window receive the same stem; their extension distinguishes them. """ delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds()) n = delta_sec // _STEM_UNIT_SEC s = "" for _ in range(4): s = _STEM_CHARS[n % 36] + s n //= 36 return s def blastware_filename(event: Event, serial: str, ach: bool = False) -> str: """ Return the correct Blastware filename for an event. CONFIRMED 2026-04-22 — verified against 3,248 files from a 10-year archive. Filename format: 0[T] where: prefix_letter = chr(ord('B') + floor(serial_numeric / 1000)) — encodes the production generation (batch of 1000 units) — e.g. BE6907→H, BE11529→M, BE14036→P, BE18003→T serial3 = f"{serial_numeric % 1000:03d}" — last 3 digits of numeric serial, zero-padded stem = 4-char base-36 of floor(total_seconds / 1296) — encodes which 21.6-minute window the event fell in AB = 2-char base-36 of (total_seconds % 1296) — encodes seconds within the window (0–1295) 0 = always literal digit zero T = 'W' or 'H' — ONLY appended for call-home (ACH) downloads (ach=True). Manual / direct downloads produce a 3-char extension (AB0) with no type char. Call-home downloads produce a 4-char extension (AB0W or AB0H). total_seconds = (event_local_time − 1985-01-01T00:00:00_local) in seconds The 10-year production archive contains only call-home files (all end in W or H). Manual Blastware downloads produce 3-char extensions — the same AB0 prefix but without the trailing type character. Micromate Series 4 uses a completely different naming scheme (literal datetime in filename); this function does not apply to Micromate units. Args: event: Event object with timestamp set. serial: Device serial number string (e.g. "BE11529"). ach: If True, append W/H type character (call-home style). If False (default), omit type character (direct download style). Returns: Filename string, e.g. "M529LIY6.CE0" (direct) or "M529LIY6.CE0H" (ACH). """ # ── Serial prefix ────────────────────────────────────────────────────────── serial_digits = "".join(c for c in serial if c.isdigit()) if len(serial_digits) >= 1: serial_numeric = int(serial_digits) generation = serial_numeric // 1000 prefix_letter = chr(ord('B') + generation) serial3 = f"{serial_numeric % 1000:03d}" else: prefix_letter = "M" # fallback serial3 = "000" prefix = prefix_letter + serial3 # ── Stem + AB extension from timestamp ──────────────────────────────────── if event.timestamp is not None: try: ts_local = datetime.datetime( event.timestamp.year, event.timestamp.month, event.timestamp.day, event.timestamp.hour, event.timestamp.minute, event.timestamp.second, ) delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds()) stem = _make_stem(ts_local) ab_val = delta_sec % _STEM_UNIT_SEC ab_str = _STEM_CHARS[ab_val // 36] + _STEM_CHARS[ab_val % 36] except (ValueError, TypeError, AttributeError): stem = "0000" ab_str = "00" else: stem = "0000" ab_str = "00" # ── Type character (ACH only) ───────────────────────────────────────────── if ach: if getattr(event, 'recording_mode', None) in (3, 4): # Histogram / Hist+Cont type_char = 'H' else: type_char = 'W' ext = f".{ab_str}0{type_char}" else: ext = f".{ab_str}0" return prefix + stem + ext # ── Waveform file writer ─────────────────────────────────────────────────────────── def write_blastware_file( event: Event, a5_frames: list[S3Frame], path: Union[str, Path], ) -> None: """ Write a Blastware waveform file from a downloaded event. Args: event: Event object (populated by get_events() or download_waveform()). Used for the STRT record (key, rectime) and footer timestamps. a5_frames: Complete A5 frame list INCLUDING the terminator frame (page_key=0x0000). Pass include_terminator=True to read_bulk_waveform_stream() when collecting frames. Must have at least 2 frames (probe + terminator). path: Destination file path. Parent directory must exist. Extension should be set via blastware_filename(). File layout: [22B header] [21B STRT] [body bytes] [26B footer] Raises: ValueError: if a5_frames is empty or has no terminator (page_key=0). OSError: if the file cannot be written. Confirmed correct waveform body reconstruction against M529LIY6 (example event) (2026-04-21). """ if not a5_frames: raise ValueError("a5_frames must not be empty") path = Path(path) # ── Extract STRT record from probe frame ──────────────────────────────── # The STRT record (21 bytes) lives verbatim inside A5[0].data[7:]. # It is stored as-is in the waveform file — do NOT reconstruct it from Event # fields, as bytes [10:14] and [14:20] contain device-specific values # (not simply key4 repeated or zero-padded). Confirmed 2026-04-21. # # STRT layout (21 bytes, observed in M529LIY6 files): # [0:4] b'STRT' # [4:6] 0xff 0xfe (fixed) # [6:10] key4 (event key) # [10:14] device-specific field (NOT a key4 repeat) # [14:20] device-specific fields (NOT zeros) # [20] rectime uint8 seconds # Extract STRT from the DLE-stripped probe frame. # # frame.data[7:] is the raw wire representation; it may contain DLE+{02,03,04} # inner-frame pairs that S3FrameParser preserves as two literal bytes. The # Blastware file stores the stripped form, so we must strip before extracting. # # Example (M529LK0Y, 2026-04-21): STRT contains value 0x02 encoded as [10 02] # on the wire. Without stripping, STRT is 22 raw bytes → write_blastware_file writes the # DLE prefix into the file AND begins the body 1 byte too early (probe_skip off # by 1). Stripping fixes both. # # probe_skip must be computed in the RAW frame.data domain (it is used as the # `skip` argument to _frame_body_bytes which operates on raw frame.data). # We walk the raw bytes counting stripped bytes until we have passed # strt_pos + 21 stripped bytes, giving the raw offset of the first body byte. w0_raw = bytes(a5_frames[0].data[7:]) w0_stripped = _strip_inner_frame_dles(w0_raw) strt_pos_stripped = w0_stripped.find(b"STRT") if strt_pos_stripped >= 0: strt = bytes(w0_stripped[strt_pos_stripped : strt_pos_stripped + 21]) # Walk raw bytes to find the raw-domain end of the STRT (= body start). target_stripped = strt_pos_stripped + 21 stripped_so_far = 0 raw_i = 0 while stripped_so_far < target_stripped and raw_i < len(w0_raw): if (w0_raw[raw_i] == 0x10 and raw_i + 1 < len(w0_raw) and w0_raw[raw_i + 1] in {0x02, 0x03, 0x04}): raw_i += 2 # DLE pair → 1 stripped byte, 2 raw bytes else: raw_i += 1 # normal byte → 1 stripped byte, 1 raw byte stripped_so_far += 1 probe_skip = 7 + raw_i # raw bytes to skip: 7 header + raw STRT length else: # Fallback: construct a minimal STRT if probe frame lacks it key4 = event._waveform_key if hasattr(event, '_waveform_key') and event._waveform_key else bytes(4) rectime = event.rectime_seconds if event.rectime_seconds is not None else 0 strt = b"STRT" + b"\xff\xfe" + key4 + bytes(14) + bytes([rectime & 0xFF]) probe_skip = 7 + 21 if len(strt) != 21: raise ValueError(f"STRT record must be 21 bytes, got {len(strt)}") # ── Build waveform file header ───────────────────────────────────────────────────── header = _FILE_HEADER_PREFIX + _WAVEFORM_TYPE_TAG assert len(header) == _WAVEFORM_HEADER_SIZE, f"Waveform header must be {_WAVEFORM_HEADER_SIZE} bytes" # ── Build body from A5 frames ──────────────────────────────────────────── # The waveform body is reconstructed from ALL A5 frames (data + terminator). # The terminator frame's contribution includes the 26-byte footer at its end. # # Reconstruction layout (confirmed from M529LIY6 captures, 2026-04-21): # all_bytes = contributions from A5[0..N] + terminator_contribution # body = all_bytes[:-26] (everything except the last 26 bytes) # footer = all_bytes[-26:] (last 26 bytes = the waveform file footer) # # The footer bytes come directly from the terminator frame's inner content — # using them verbatim ensures timestamps match the device's recorded values. # Separate terminator from data frames. # Search from the FRONT for the first terminator (page_key == 0x0000). # Do NOT use a5_frames[-1] — if _a5_frames contains stray frames from a # subsequent event (a known get_events side-effect), the last frame will # not be the terminator and the footer will be mis-identified. term_idx: Optional[int] = None for _i, _f in enumerate(a5_frames): if _f.page_key == 0x0000: term_idx = _i break if term_idx is not None: body_frames = a5_frames[:term_idx] term_frame = a5_frames[term_idx] else: body_frames = a5_frames term_frame = None all_bytes = bytearray() for fi, frame in enumerate(body_frames): if fi == 0: skip = probe_skip elif fi == 1: skip = 13 # 7-byte frame.data prefix + 6-byte first-chunk header else: skip = 12 # 7-byte frame.data prefix + 5-byte chunk header all_bytes.extend(_frame_body_bytes(frame, skip)) # Terminator contributes its content, which ends with the 26-byte footer. # skip=11 (not 12) because the terminator's inner frame header is 4 bytes, # one shorter than chunk frames' 5-byte inner header. Confirmed 2026-04-21. if term_frame is not None: all_bytes.extend(_frame_body_bytes(term_frame, 11)) if len(all_bytes) >= 26: body = bytes(all_bytes[:-26]) footer = bytes(all_bytes[-26:]) else: # Fallback: no terminator or very short stream → build footer from event metadata body = bytes(all_bytes) start_dt = _ts_from_model(event.timestamp) stop_dt: Optional[datetime.datetime] = None if start_dt is not None and event.rectime_seconds: stop_dt = start_dt + datetime.timedelta(seconds=event.rectime_seconds) footer = ( b"\x0e\x08" + _encode_ts_be(start_dt) + _encode_ts_be(stop_dt) + b"\x00\x01\x00\x02\x00\x00" + b"\x00\x00" # CRC placeholder ) # ── Write file ─────────────────────────────────────────────────────────── with open(path, "wb") as f: f.write(header) f.write(strt) f.write(body) f.write(footer) def read_blastware_file(path: Union[str, Path]) -> Event: """ Parse a Blastware waveform file into an Event object. NOT YET IMPLEMENTED. Args: path: Path to the waveform file. Returns: Event object with waveform data populated. Raises: NotImplementedError: always (pending implementation). """ raise NotImplementedError("read_blastware_file() is not yet implemented") # ── MLG file writer ─────────────────────────────────────────────────────────── def _build_mlg_header(serial: str) -> bytes: """ Build the 308-byte MLG file header. Header structure (confirmed from BE11529.MLG binary inspection): Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 (22B) Offset 0x16: ... (16B unknown — observed as zeros in BE11529.MLG) Offset 0x2A: serial number (8 bytes, null-padded ASCII) ... rest zero-padded to 308 bytes The serial string "BE11529" appears at offset 0x2A (42 decimal). """ buf = bytearray(_MLG_HEADER_SIZE) # Common prefix + MLG type tag prefix = _FILE_HEADER_PREFIX + _MLG_TYPE_TAG # 22 bytes buf[0:len(prefix)] = prefix # Serial number at offset 0x2A serial_bytes = serial.encode("ascii", errors="replace")[:8] serial_padded = serial_bytes.ljust(8, b"\x00") buf[0x2A : 0x2A + 8] = serial_padded return bytes(buf) def _build_mlg_record( entry: MonitorLogEntry, serial: str, ) -> bytes: """ Build one 292-byte MLG record from a MonitorLogEntry. Record layout (confirmed from BE11529.MLG binary inspection): [0:2] CRC — 2-byte CRC (algorithm unknown; written as 0x0000) [2:6] marker — 22 01 0e 80 [6:14] ts1 — 8B big-endian start timestamp [14:22] ts2 — 8B big-endian stop timestamp [22:26] flags — 4B record flags (see MLG_FLAGS_* constants) [26:36] serial — 10B null-padded serial number [36:] text — for triggered events: [0x08][8B ts1_copy]["Geo: X.XXX in/s"] for monitoring intervals: b"" or minimal separator [... zero-padded to 292 bytes] Flags based on entry type: - MonitorLogEntry with start_time only (no stop_time): MLG_FLAGS_START_ONLY - MonitorLogEntry with both times and geo_threshold_ips set: MLG_FLAGS_TRIGGER - MonitorLogEntry with both times (monitoring interval): MLG_FLAGS_INTERVAL The triggered-event text block (flags = MLG_FLAGS_TRIGGER): [0x08] [ts1: 8B] [ASCII "Geo: X.XXX in/s\x00"] Confirmed from BE11529.MLG records at offset 0x0134 and 0x0258. """ buf = bytearray(_MLG_RECORD_SIZE) start_dt = ( datetime.datetime( entry.start_time.year, entry.start_time.month, entry.start_time.day, entry.start_time.hour, entry.start_time.minute, entry.start_time.second, ) if entry.start_time else None ) stop_dt = ( datetime.datetime( entry.stop_time.year, entry.stop_time.month, entry.stop_time.day, entry.stop_time.hour, entry.stop_time.minute, entry.stop_time.second, ) if entry.stop_time else None ) # [0:2] CRC placeholder buf[0:2] = b"\x00\x00" # [2:6] Record marker buf[2:6] = _MLG_RECORD_MARKER # [6:14] ts1 buf[6:14] = _encode_ts_be(start_dt) # [14:22] ts2 buf[14:22] = _encode_ts_be(stop_dt) # [22:26] flags if stop_dt is None: flags = MLG_FLAGS_START_ONLY elif entry.geo_threshold_ips is not None: flags = MLG_FLAGS_TRIGGER else: flags = MLG_FLAGS_INTERVAL buf[22:26] = flags # [26:36] serial (10B null-padded) serial_bytes = serial.encode("ascii", errors="replace")[:10] buf[26 : 26 + len(serial_bytes)] = serial_bytes # [36:] text content pos = 36 if flags == MLG_FLAGS_TRIGGER: # Extra ts1 copy: [0x08][ts1: 8B] buf[pos] = 0x08 pos += 1 buf[pos : pos + 8] = _encode_ts_be(start_dt) pos += 8 if entry.geo_threshold_ips is not None: geo_text = f"Geo: {entry.geo_threshold_ips:.3f} in/s\x00".encode("ascii") buf[pos : pos + len(geo_text)] = geo_text pos += len(geo_text) return bytes(buf) def write_mlg( entries: list[MonitorLogEntry], serial: str, path: Union[str, Path], ) -> None: """ Write a Blastware .MLG monitor log file. Args: entries: List of MonitorLogEntry objects (from get_monitor_log_entries()). Each entry produces one 292-byte record in the file. serial: Device serial number string (e.g. "BE11529"). Written to the file header and each record. path: Destination file path. Extension is not enforced — use ".MLG". File layout: [308B header] [N × 292B records] Note: The 2-byte CRC at the start of each record is written as 0x0000. The CRC algorithm is unknown (see module docstring). Raises: OSError: if the file cannot be written. """ path = Path(path) header = _build_mlg_header(serial) with open(path, "wb") as f: f.write(header) for entry in entries: record = _build_mlg_record(entry, serial) f.write(record) def read_mlg(path: Union[str, Path]) -> list[MonitorLogEntry]: """ Parse a Blastware .MLG file into a list of MonitorLogEntry objects. NOT YET IMPLEMENTED. Args: path: Path to the .MLG file. Returns: List of MonitorLogEntry objects. Raises: NotImplementedError: always (pending implementation). """ raise NotImplementedError("read_mlg() is not yet implemented")