""" blastware_file.py — Blastware binary file codec for bidirectional interoperability. Reads and writes the proprietary Instantel/Blastware file formats: .N00 — Single-shot triggered waveform event .9T0 — Continuous-mode triggered waveform event .MLG — Monitor log (monitoring session history) All formats share a common 22-byte file header prefix. Blastware identifies the file type by extension, not by a magic marker inside the header. IMPORTANT — .N00 vs .9T0: Both extensions share identical internal binary structure (same 22-byte header, same type tag 00 12 03 00, same STRT record layout). Blastware uses the extension to identify the recording mode: .N00 → single-shot (0C waveform sub_code = 0x10) .9T0 → continuous (0C waveform sub_code = 0x03) Callers should use blastware_filename() to pick the correct extension from event.record_type. Histogram-mode file extension is unknown (TODO). ─── File structure overview ───────────────────────────────────────────────────── N00 (single-shot waveform, confirmed from example-events/4-3-26-multi/M529LIY6.N00): [22B header] [21B STRT record] [body bytes] [26B footer] Header (22 bytes): 10 00 01 80 00 00 — fixed prefix 49 6e 73 74 61 6e 74 65 6c 00 — b'Instantel\x00' 07 2c — fixed 00 12 03 00 — N00 type marker STRT record (21 bytes, immediately follows header): 53 54 52 54 — b'STRT' ff fe — fixed (2 bytes) [key4] — 4-byte waveform event key [key4] — 4-byte waveform event key (repeated) [zeros] — 7 bytes padding [rectime] — uint8 record time in seconds Body (variable — reconstructed from A5 frame data): The body bytes are derived from the raw A5 frame wire content, specifically from the DLE-decoded representation of each frame's contribution. See the _frame_body_bytes() helper for the exact algorithm. Footer (26 bytes): 0e 08 [ts1: 8B big-endian timestamp] — start timestamp [ts2: 8B big-endian timestamp] — stop timestamp 00 01 00 02 00 00 [crc: 2B] — CRC (algorithm unconfirmed; written as 0x00 0x00 placeholder) Timestamp format (big-endian, 8 bytes): [day] [month] [year_HI] [year_LO] [0x00] [hour] [min] [sec] MLG (monitor log, confirmed from example-events/4-3-26-multi/BE11529.MLG): [308B header] [N × 292B records] Header (308 bytes): Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 — fixed (16B) Offset 0x10: ... (unknown structure, written as zeros + serial) Offset 0x2A: serial number (8 bytes, null-padded ASCII, e.g. "BE11529") ... zero-padded to 308 bytes total Record (292 bytes each): [2B CRC] — unknown algorithm; written as 0x00 0x00 22 01 0e 80 — record marker [ts1: 8B big-endian timestamp] — start time [ts2: 8B big-endian timestamp] — stop time (zeros if no stop) [4B flags] — see MLG_FLAGS_* constants below [10B serial] — null-padded serial number ASCII [text] — for trigger records: [0x08][8B ts1_copy] then ASCII "Geo: X.XXX in/s" for monitoring records: b'' (or minimal separator) [zero-padded to 292 bytes] ─── Critical implementation notes ────────────────────────────────────────────── N00 body reconstruction algorithm (confirmed 2026-04-21 from verification against M529LIY6.N00 using raw_s3_20260403_153508.bin capture): The N00 body bytes come from the A5 frame content, stripped of DLE-framing artifacts. Each A5 frame contributes a different slice of its data section, with DLE+{0x02,0x03,0x04} byte pairs stripped. Skip amounts per frame index (offsets into frame.data): A5[0] (probe): data[strt_pos + 21 + 7] (skip header + STRT record) strt_pos found by searching frame.data[7:] for b'STRT'; the contribution starts at strt_pos + 21 within data[7:] which equals strt_pos + 21 + 7 within frame.data. A5[1]: data[13] (skip 7-byte frame.data prefix + 6 header bytes) A5[2..N]: data[12] (skip 7-byte frame.data prefix + 5 header bytes) Terminator A5: data[11] (1 byte less than chunk frames; terminator inner header is 4 bytes instead of 5 — confirmed 2026-04-21) DLE strip rule (applied AFTER slicing): Strip any 0x10 byte that is immediately followed by 0x02, 0x03, or 0x04. This undoes the DLE-escape that S3FrameParser preserves as literal pairs. Applied to frame.data[skip:] + bytes([frame.chk_byte]) together, then conditionally exclude the trailing chk_byte from the output. chk_byte absorption: When frame.data[-1] == 0x10 AND frame.chk_byte ∈ {0x02, 0x03, 0x04}, the last byte of frame.data is the DLE prefix of a split DLE+chk pair. Including chk_byte in the strip buffer allows the pair to be stripped as a unit. After stripping, the trailing chk_byte is ALWAYS removed — because _strip_inner_frame_dles keeps the byte after the DLE (the chk_byte value), and that value is the checksum, never payload. This applies to all three cases (chk ∈ {0x02, 0x03, 0x04}) identically. MLG CRC: The algorithm that produces the 2-byte CRC at the start of each MLG record is unknown. All examined records use non-zero values that do not match CRC-16/CCITT, CRC-16/IBM, CRC-32 (truncated), word sums, XOR variants, or any of the 40+ polynomial/init combinations tested. The writer emits 0x0000. This produces files that Blastware may reject or display without the CRC check — the exact impact on BW import is unknown (TODO: test). ─── Public API ────────────────────────────────────────────────────────────────── blastware_filename(event, serial) Return the correct Blastware filename (e.g. "M529LIY6.N00") for an event. Uses event.record_type to pick .N00 (single-shot) vs .9T0 (continuous). write_n00(event, a5_frames, path) Create a .N00 or .9T0 waveform file from an Event and the full A5 frame list (include_terminator=True required when calling read_bulk_waveform_stream). Identical binary format for both extensions — caller picks the path/ext. read_n00(path) → Event Parse a .N00 file into an Event object with waveform data populated. (Not yet implemented — placeholder raises NotImplementedError.) write_mlg(entries, serial, path) Create a .MLG file from a list of MonitorLogEntry objects. read_mlg(path) → list[MonitorLogEntry] Parse a .MLG file into MonitorLogEntry objects. (Not yet implemented — placeholder raises NotImplementedError.) """ from __future__ import annotations import datetime import struct from pathlib import Path from typing import Optional, Union from .framing import S3Frame from .models import Event, MonitorLogEntry, Timestamp # ── File header constants ───────────────────────────────────────────────────── # Common 16-byte prefix shared by N00 and MLG (confirmed from binary inspection). _FILE_HEADER_PREFIX = bytes.fromhex("1000018000004973") + b"tantel\x00\x07\x2c" # = 10 00 01 80 00 00 49 73 74 61 6e 74 65 6c 00 07 2c (17 bytes) # Confirmed breakdown: 10 00 01 80 00 00 = fixed; "Instantel\x00" = 10B; 07 2c = fixed # Simpler construction: _FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 17 bytes # N00 type tag (4 bytes after common prefix) _N00_TYPE_TAG = b"\x00\x12\x03\x00" # confirmed from M529LIY6.N00 offset 0x11..0x14 # MLG type tag (4 bytes after common prefix) _MLG_TYPE_TAG = b"\x22\x01\x0e\xa0" # confirmed from BE11529.MLG offset 0x11..0x14 # Total header sizes _N00_HEADER_SIZE = 22 # 17 + 4 = 21... wait. Let me recalculate. # From binary: first 22 bytes = header, then STRT at byte 22. # 17-byte common prefix + 4-byte type tag = 21 bytes. But observed header is 22B. # Checking: 6 fixed + 10 "Instantel\x00" + 2 "07 2c" = 18B prefix, then 4B type tag = 22B. # Re-count: b"\x10\x00\x01\x80\x00\x00" = 6B + b"Instantel\x00" = 10B + b"\x07\x2c" = 2B = 18B prefix. _FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 18 bytes _N00_HEADER_SIZE = 22 # 18 + 4 = 22 bytes ✅ _MLG_HEADER_SIZE = 308 # confirmed from BE11529.MLG # MLG record marker (4 bytes after 2-byte CRC at start of each record) _MLG_RECORD_MARKER = b"\x22\x01\x0e\x80" _MLG_RECORD_SIZE = 292 # bytes per record (confirmed from BE11529.MLG) # MLG record flags (4 bytes at record[22:26]) # Confirmed from BE11529.MLG binary inspection: MLG_FLAGS_START_ONLY = b"\xff\xff\x00\x00" # monitoring start with no stop MLG_FLAGS_TRIGGER = b"\x01\x00\x02\x00" # triggered event (has ts1 + ts2) MLG_FLAGS_INTERVAL = b"\x02\x00\x00\x00" # monitoring interval (has ts1 + ts2) # ── Timestamp helpers ───────────────────────────────────────────────────────── def _encode_ts_be(ts: Optional[datetime.datetime]) -> bytes: """ Encode a datetime as an 8-byte big-endian Blastware timestamp. Format (N00 and MLG record timestamps): [day][month][year_HI][year_LO][0x00][hour][min][sec] Big-endian year confirmed from M529LIY6.N00 footer: footer bytes [2..9] = 01 04 07 ea 00 00 1c 08 → day=1 month=4 year=0x07ea=2026 hour=0 min=28 sec=8 ✅ Returns 8 zero bytes if ts is None. """ if ts is None: return bytes(8) return bytes([ ts.day, ts.month, (ts.year >> 8) & 0xFF, ts.year & 0xFF, 0x00, ts.hour, ts.minute, ts.second, ]) def _decode_ts_be(raw: bytes) -> Optional[datetime.datetime]: """ Decode an 8-byte big-endian Blastware timestamp. Returns None if the bytes are all zero or structurally invalid. """ if len(raw) < 8 or raw == bytes(8): return None day = raw[0] month = raw[1] year = (raw[2] << 8) | raw[3] hour = raw[5] minute = raw[6] sec = raw[7] try: return datetime.datetime(year, month, day, hour, minute, sec) except ValueError: return None def _ts_from_model(ts: Optional[Timestamp]) -> Optional[datetime.datetime]: """Convert a models.Timestamp to datetime.datetime, or None.""" if ts is None: return None try: return datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second) except (ValueError, TypeError): return None # ── DLE strip helper ────────────────────────────────────────────────────────── def _strip_inner_frame_dles(data: bytes) -> bytes: """ Strip DLE (0x10) framing markers from A5 inner-frame content. The A5 (bulk waveform stream) response body contains DLE-encoded sub-frame structure. S3FrameParser preserves DLE+XX pairs as two literal bytes in frame.data. Only the DLE marker byte needs to be removed; the following byte is actual payload content. Rule: when 0x10 is immediately followed by {0x02, 0x03, 0x04}, strip the 0x10 (DLE marker) and keep the following byte as payload. Lone 0x10 bytes not followed by {0x02, 0x03, 0x04} are kept as-is. Confirmed correct by verifying reconstructed N00 body against M529LIY6.N00: - 0x10 0x02 in terminator → 0x02 kept ✓ - 0x10 0x04 in terminator (month byte) → 0x04 kept ✓ """ out = bytearray() i = 0 while i < len(data): b = data[i] if b == 0x10 and i + 1 < len(data) and data[i + 1] in {0x02, 0x03, 0x04}: # Strip the DLE marker; the next byte is payload and will be appended # in the next loop iteration. i += 1 continue out.append(b) i += 1 return bytes(out) def _frame_body_bytes(frame: S3Frame, skip: int) -> bytes: """ Extract the N00 body contribution from one A5 S3Frame. The contribution is frame.data[skip:] with inner-frame DLE pairs stripped per _strip_inner_frame_dles(). The chk_byte is temporarily appended before stripping to handle the split-pair edge case where a DLE at the end of frame.data is paired with chk_byte. Split-pair edge case (confirmed for A5[8] of M529LIY6.N00, 2026-04-21): S3FrameParser appends DLE+XX pairs as two literal bytes when XX ∉ {DLE, ETX}. When the LAST occurrence of such a pair straddles the payload/checksum boundary (i.e., DLE is the last byte of raw_payload and XX is the checksum), the parser splits them: - DLE ends up as the last byte of frame.data (frame.data[-1] == 0x10) - XX is stored as frame.chk_byte To strip the pair correctly, we reunite the bytes before calling the strip function. Since chk_byte is the checksum (not payload data), it is excluded from the final output regardless of whether it was part of a pair. Post-strip chk_byte removal (ALL cases): _strip_inner_frame_dles strips the 0x10 and KEEPS chk_byte in all cases. Chk_byte is always the checksum (not payload), so always strip it off. Args: frame: S3Frame with frame.data and frame.chk_byte populated. skip: Number of leading bytes in frame.data to exclude (frame header). Returns: bytes — the N00 body contribution for this frame. """ if skip >= len(frame.data): return b"" relevant = frame.data[skip:] # Detect split DLE+chk pair at the frame boundary. has_split_pair = ( len(relevant) > 0 and relevant[-1] == 0x10 and frame.chk_byte in {0x02, 0x03, 0x04} ) if has_split_pair: # Reunite the split pair so the strip function sees both bytes together. buf = relevant + bytes([frame.chk_byte]) stripped = _strip_inner_frame_dles(buf) # _strip_inner_frame_dles strips the DLE (0x10) and KEEPS chk_byte. # chk_byte is the received checksum — never payload — so remove it. # This is correct for all values in {0x02, 0x03, 0x04}. if stripped: stripped = stripped[:-1] return stripped else: return _strip_inner_frame_dles(relevant) # ── Filename helper ─────────────────────────────────────────────────────────── _INSTANTEL_EPOCH = datetime.datetime(1985, 1, 1, 0, 0, 0) """ Instantel timestamp epoch — January 1, 1985, 00:00:00 local time. Confirmed 2026-04-21: stem values for 6 independent events (April 1–9, 2026) all converge to this epoch when decoded as floor(seconds_since_epoch / 1296). 1985 is the year Instantel was founded. """ _STEM_UNIT_SEC = 1296 # = 36^2 seconds ≈ 21.6 minutes per stem unit _STEM_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" # Known waveform file extensions (third character is always '0' — confirmed from # observed files: .N00, .9T0, .490, .5K0, .980, .ML0). # # Confirmed mappings: # .N00 → single-shot (recording_mode=0 in compliance anchor at file[anc-7]) # .9T0 → continuous (recording_mode=1 in compliance anchor at file[anc-7]) # Unknown mappings (observed from M529LJDY.* and M529LJ8V.*): # .490 → ? (April 6, 13 sec record) # .5K0 → ? (April 9, 10 sec record) # .980 → ? (April 9, 7 sec record) # .ML0 → ? (April 9, 167 sec record — possibly Histogram or Histogram+Continuous) # # IMPORTANT — extension encodes capture-time config, NOT session-start config: # Binary analysis (2026-04-21) shows that the compliance anchor region in the # file body encodes the SESSION-START config (A5 frame 7), not the per-event # config. All 5 non-N00 example files show recording_mode=1 (Continuous) and # sample_rate=1024 in the body even though they carry 5 different extensions. # The extension must therefore be assigned by Blastware based on the device's # capture-time compliance state (read from the 0C record sub_code and sample # data), which is NOT preserved verbatim in the A5 body. # # How to READ recording_mode from a .N00/.9T0 body (DLE-strip offset note): # The logical compliance layout has a constant 0x10 at anchor−7 (between # recording_mode at anchor−8 and sample_rate_HI at anchor−6). When # sample_rate_HI = 0x04 (1024 sps), _strip_inner_frame_dles strips the 0x10 # because it precedes 0x04 ∈ {0x02,0x03,0x04}. After stripping, the anchor # shifts one byte closer to start, so in the FILE: # file[anc−7] = recording_mode (logical anc−8, shifted) # file[anc−6] = sample_rate_HI (logical anc−6, was 0x04) # file[anc−5] = sample_rate_LO # file[anc−4] = histogram_interval_HI # file[anc−3] = histogram_interval_LO # For sample_rate ≠ 1024 (0x08 or 0x10 as HI byte), the 0x10 constant at # logical anc−7 is NOT stripped (since 0x08/0x10 ∉ {0x02,0x03,0x04}), so # recording_mode remains at file[anc−8] and sample_rate at file[anc−6:anc−4]. # # Multiple events within the same ~21.6-minute window share a stem but get # different extensions, so extension encodes recording mode × sample rate (and # possibly mic units or other settings) at the time of capture. def _make_stem(ts_local: datetime.datetime) -> str: """ Encode a local timestamp as a 4-character uppercase base-36 stem. Algorithm (confirmed 2026-04-21 from 6 known file/timestamp pairs): stem_int = floor((ts_local - Jan_1_1985_midnight_local) / 1296_seconds) stem = 4-char uppercase base-36 encoding of stem_int Unit = 36² = 1296 seconds ≈ 21.6 minutes. Events within the same 1296-second window receive the same stem; their extension distinguishes them. """ delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds()) n = delta_sec // _STEM_UNIT_SEC s = "" for _ in range(4): s = _STEM_CHARS[n % 36] + s n //= 36 return s def blastware_filename(event: Event, serial: str) -> str: """ Return the correct Blastware waveform filename for an event. Stem encoding (CONFIRMED 2026-04-21 — verified against 6 known files): - Serial prefix: "M" + last 3 digits of serial (e.g. "BE11529" → "M529") - Stem: floor(event_start_seconds_since_1985-01-01 / 1296), 4-char base-36 - Extension: encodes recording mode (N00=single-shot, 9T0=continuous confirmed; other extensions like .490, .5K0, .980, .ML0 observed but not decoded) Note: the extension space is larger than N00/9T0. Multiple events within the same ~21.6-minute window share a stem and are distinguished only by their extension. This function returns .N00 or .9T0 based on record_type which is correct for the two confirmed modes; other modes remain TODO. Args: event: Event object with record_type and timestamp set. serial: Device serial number string (e.g. "BE11529"). Returns: Filename string (e.g. "M529LIY6.N00"). """ # Determine extension from record_type if event.record_type == "continuous": ext = ".9T0" else: # Default to .N00 for single-shot and unknown modes ext = ".N00" # Serial prefix: "M" + last 3 digits (e.g. BE11529 → M529) serial_digits = "".join(c for c in serial if c.isdigit()) prefix = "M" + serial_digits[-3:] if len(serial_digits) >= 3 else "M000" # Stem from event start timestamp if event.timestamp is not None: try: ts_local = datetime.datetime( event.timestamp.year, event.timestamp.month, event.timestamp.day, event.timestamp.hour, event.timestamp.minute, event.timestamp.second, ) stem = _make_stem(ts_local) except (ValueError, TypeError, AttributeError): stem = "0000" else: stem = "0000" return prefix + stem + ext # ── N00 file writer ─────────────────────────────────────────────────────────── def write_n00( event: Event, a5_frames: list[S3Frame], path: Union[str, Path], ) -> None: """ Write a Blastware .N00 waveform file from a downloaded event. Args: event: Event object (populated by get_events() or download_waveform()). Used for the STRT record (key, rectime) and footer timestamps. a5_frames: Complete A5 frame list INCLUDING the terminator frame (page_key=0x0000). Pass include_terminator=True to read_bulk_waveform_stream() when collecting frames. Must have at least 2 frames (probe + terminator). path: Destination file path. Parent directory must exist. Extension is not enforced — caller should use ".N00". File layout: [22B header] [21B STRT] [body bytes] [26B footer] Raises: ValueError: if a5_frames is empty or has no terminator (page_key=0). OSError: if the file cannot be written. Confirmed correct N00 body reconstruction against M529LIY6.N00 (2026-04-21). """ if not a5_frames: raise ValueError("a5_frames must not be empty") path = Path(path) # ── Extract STRT record from probe frame ──────────────────────────────── # The STRT record (21 bytes) lives verbatim inside A5[0].data[7:]. # It is stored as-is in the N00 file — do NOT reconstruct it from Event # fields, as bytes [10:14] and [14:20] contain device-specific values # (not simply key4 repeated or zero-padded). Confirmed 2026-04-21. # # STRT layout (21 bytes, observed in M529LIY6.N00): # [0:4] b'STRT' # [4:6] 0xff 0xfe (fixed) # [6:10] key4 (event key) # [10:14] device-specific field (NOT a key4 repeat) # [14:20] device-specific fields (NOT zeros) # [20] rectime uint8 seconds w0 = a5_frames[0].data[7:] strt_pos_w0 = w0.find(b"STRT") if strt_pos_w0 >= 0: strt = bytes(w0[strt_pos_w0 : strt_pos_w0 + 21]) else: # Fallback: construct a minimal STRT if probe frame lacks it key4 = event._waveform_key if hasattr(event, '_waveform_key') and event._waveform_key else bytes(4) rectime = event.rectime_seconds if event.rectime_seconds is not None else 0 strt = b"STRT" + b"\xff\xfe" + key4 + bytes(14) + bytes([rectime & 0xFF]) if len(strt) != 21: raise ValueError(f"STRT record must be 21 bytes, got {len(strt)}") strt_pos_in_w0 = strt_pos_w0 if strt_pos_w0 >= 0 else 0 # ── Build N00 header ───────────────────────────────────────────────────── header = _FILE_HEADER_PREFIX + _N00_TYPE_TAG assert len(header) == _N00_HEADER_SIZE, f"N00 header must be {_N00_HEADER_SIZE} bytes" # ── Build body from A5 frames ──────────────────────────────────────────── # The N00 body is reconstructed from ALL A5 frames (data + terminator). # The terminator frame's contribution includes the 26-byte footer at its end. # # Reconstruction layout (confirmed from M529LIY6.N00, 2026-04-21): # all_bytes = contributions from A5[0..N] + terminator_contribution # body = all_bytes[:-26] (everything except the last 26 bytes) # footer = all_bytes[-26:] (last 26 bytes = the N00 footer) # # The footer bytes come directly from the terminator frame's inner content — # using them verbatim ensures timestamps match the device's recorded values. # Separate terminator from data frames body_frames = a5_frames term_frame: Optional[S3Frame] = None if a5_frames and a5_frames[-1].page_key == 0x0000: body_frames = a5_frames[:-1] term_frame = a5_frames[-1] # Skip for A5[0]: 7-byte frame.data prefix + strt_pos_in_w0 + 21 STRT bytes. # strt_pos_in_w0 was already found in the STRT extraction block above. probe_skip = 7 + strt_pos_in_w0 + 21 all_bytes = bytearray() for fi, frame in enumerate(body_frames): if fi == 0: skip = probe_skip elif fi == 1: skip = 13 # 7-byte frame.data prefix + 6-byte first-chunk header else: skip = 12 # 7-byte frame.data prefix + 5-byte chunk header all_bytes.extend(_frame_body_bytes(frame, skip)) # Terminator contributes its content, which ends with the 26-byte footer. # skip=11 (not 12) because the terminator's inner frame header is 4 bytes, # one shorter than chunk frames' 5-byte inner header. Confirmed 2026-04-21. if term_frame is not None: all_bytes.extend(_frame_body_bytes(term_frame, 11)) if len(all_bytes) >= 26: body = bytes(all_bytes[:-26]) footer = bytes(all_bytes[-26:]) else: # Fallback: no terminator or very short stream → build footer from event metadata body = bytes(all_bytes) start_dt = _ts_from_model(event.timestamp) stop_dt: Optional[datetime.datetime] = None if start_dt is not None and event.rectime_seconds: stop_dt = start_dt + datetime.timedelta(seconds=event.rectime_seconds) footer = ( b"\x0e\x08" + _encode_ts_be(start_dt) + _encode_ts_be(stop_dt) + b"\x00\x01\x00\x02\x00\x00" + b"\x00\x00" # CRC placeholder ) # ── Write file ─────────────────────────────────────────────────────────── with open(path, "wb") as f: f.write(header) f.write(strt) f.write(body) f.write(footer) def read_n00(path: Union[str, Path]) -> Event: """ Parse a Blastware .N00 file into an Event object. NOT YET IMPLEMENTED. Args: path: Path to the .N00 file. Returns: Event object with waveform data populated. Raises: NotImplementedError: always (pending implementation). """ raise NotImplementedError("read_n00() is not yet implemented") # ── MLG file writer ─────────────────────────────────────────────────────────── def _build_mlg_header(serial: str) -> bytes: """ Build the 308-byte MLG file header. Header structure (confirmed from BE11529.MLG binary inspection): Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 (22B) Offset 0x16: ... (16B unknown — observed as zeros in BE11529.MLG) Offset 0x2A: serial number (8 bytes, null-padded ASCII) ... rest zero-padded to 308 bytes The serial string "BE11529" appears at offset 0x2A (42 decimal). """ buf = bytearray(_MLG_HEADER_SIZE) # Common prefix + MLG type tag prefix = _FILE_HEADER_PREFIX + _MLG_TYPE_TAG # 22 bytes buf[0:len(prefix)] = prefix # Serial number at offset 0x2A serial_bytes = serial.encode("ascii", errors="replace")[:8] serial_padded = serial_bytes.ljust(8, b"\x00") buf[0x2A : 0x2A + 8] = serial_padded return bytes(buf) def _build_mlg_record( entry: MonitorLogEntry, serial: str, ) -> bytes: """ Build one 292-byte MLG record from a MonitorLogEntry. Record layout (confirmed from BE11529.MLG binary inspection): [0:2] CRC — 2-byte CRC (algorithm unknown; written as 0x0000) [2:6] marker — 22 01 0e 80 [6:14] ts1 — 8B big-endian start timestamp [14:22] ts2 — 8B big-endian stop timestamp [22:26] flags — 4B record flags (see MLG_FLAGS_* constants) [26:36] serial — 10B null-padded serial number [36:] text — for triggered events: [0x08][8B ts1_copy]["Geo: X.XXX in/s"] for monitoring intervals: b"" or minimal separator [... zero-padded to 292 bytes] Flags based on entry type: - MonitorLogEntry with start_time only (no stop_time): MLG_FLAGS_START_ONLY - MonitorLogEntry with both times and geo_threshold_ips set: MLG_FLAGS_TRIGGER - MonitorLogEntry with both times (monitoring interval): MLG_FLAGS_INTERVAL The triggered-event text block (flags = MLG_FLAGS_TRIGGER): [0x08] [ts1: 8B] [ASCII "Geo: X.XXX in/s\x00"] Confirmed from BE11529.MLG records at offset 0x0134 and 0x0258. """ buf = bytearray(_MLG_RECORD_SIZE) start_dt = ( datetime.datetime( entry.start_time.year, entry.start_time.month, entry.start_time.day, entry.start_time.hour, entry.start_time.minute, entry.start_time.second, ) if entry.start_time else None ) stop_dt = ( datetime.datetime( entry.stop_time.year, entry.stop_time.month, entry.stop_time.day, entry.stop_time.hour, entry.stop_time.minute, entry.stop_time.second, ) if entry.stop_time else None ) # [0:2] CRC placeholder buf[0:2] = b"\x00\x00" # [2:6] Record marker buf[2:6] = _MLG_RECORD_MARKER # [6:14] ts1 buf[6:14] = _encode_ts_be(start_dt) # [14:22] ts2 buf[14:22] = _encode_ts_be(stop_dt) # [22:26] flags if stop_dt is None: flags = MLG_FLAGS_START_ONLY elif entry.geo_threshold_ips is not None: flags = MLG_FLAGS_TRIGGER else: flags = MLG_FLAGS_INTERVAL buf[22:26] = flags # [26:36] serial (10B null-padded) serial_bytes = serial.encode("ascii", errors="replace")[:10] buf[26 : 26 + len(serial_bytes)] = serial_bytes # [36:] text content pos = 36 if flags == MLG_FLAGS_TRIGGER: # Extra ts1 copy: [0x08][ts1: 8B] buf[pos] = 0x08 pos += 1 buf[pos : pos + 8] = _encode_ts_be(start_dt) pos += 8 if entry.geo_threshold_ips is not None: geo_text = f"Geo: {entry.geo_threshold_ips:.3f} in/s\x00".encode("ascii") buf[pos : pos + len(geo_text)] = geo_text pos += len(geo_text) return bytes(buf) def write_mlg( entries: list[MonitorLogEntry], serial: str, path: Union[str, Path], ) -> None: """ Write a Blastware .MLG monitor log file. Args: entries: List of MonitorLogEntry objects (from get_monitor_log_entries()). Each entry produces one 292-byte record in the file. serial: Device serial number string (e.g. "BE11529"). Written to the file header and each record. path: Destination file path. Extension is not enforced — use ".MLG". File layout: [308B header] [N × 292B records] Note: The 2-byte CRC at the start of each record is written as 0x0000. The CRC algorithm is unknown (see module docstring). Raises: OSError: if the file cannot be written. """ path = Path(path) header = _build_mlg_header(serial) with open(path, "wb") as f: f.write(header) for entry in entries: record = _build_mlg_record(entry, serial) f.write(record) def read_mlg(path: Union[str, Path]) -> list[MonitorLogEntry]: """ Parse a Blastware .MLG file into a list of MonitorLogEntry objects. NOT YET IMPLEMENTED. Args: path: Path to the .MLG file. Returns: List of MonitorLogEntry objects. Raises: NotImplementedError: always (pending implementation). """ raise NotImplementedError("read_mlg() is not yet implemented")