838 lines
35 KiB
Python
838 lines
35 KiB
Python
"""
|
||
blastware_file.py — Blastware binary file codec for bidirectional interoperability.
|
||
|
||
Reads and writes the proprietary Instantel/Blastware file formats:
|
||
.N00 / .9T0 / .EI0 / etc. — Waveform event (extension encoding UNKNOWN — see below)
|
||
.MLG — Monitor log (monitoring session history)
|
||
|
||
All waveform formats share a common 22-byte file header prefix and identical
|
||
internal binary structure (same type tag 00 12 03 00, same STRT record layout).
|
||
Blastware identifies the file type by extension, not by a magic marker.
|
||
|
||
EXTENSION ENCODING — V10.72 firmware FULLY CONFIRMED 2026-04-22:
|
||
Format: AB0T where AB = 2-char base-36 of (total_seconds % 1296),
|
||
0 = literal zero, T = W (Full Waveform) or H (Full Histogram).
|
||
total_seconds = (event_local_time − 1985-01-01T00:00:00_local).
|
||
Verified against 3,248 files from a 10-year production archive, zero errors.
|
||
|
||
Old firmware (S338, 3-char extensions ending in '0'): encoding unknown.
|
||
The extension is NOT recording mode — confirmed false 2026-04-21.
|
||
Micromate Series 4 uses a different scheme (literal datetime in filename).
|
||
|
||
─── File structure overview ─────────────────────────────────────────────────────
|
||
|
||
N00 (single-shot waveform, confirmed from example-events/4-3-26-multi/M529LIY6.N00):
|
||
|
||
[22B header] [21B STRT record] [body bytes] [26B footer]
|
||
|
||
Header (22 bytes):
|
||
10 00 01 80 00 00 — fixed prefix
|
||
49 6e 73 74 61 6e 74 65 6c 00 — b'Instantel\x00'
|
||
07 2c — fixed
|
||
00 12 03 00 — N00 type marker
|
||
|
||
STRT record (21 bytes, immediately follows header):
|
||
53 54 52 54 — b'STRT'
|
||
ff fe — fixed (2 bytes)
|
||
[key4] — 4-byte waveform event key
|
||
[key4] — 4-byte waveform event key (repeated)
|
||
[zeros] — 7 bytes padding
|
||
[rectime] — uint8 record time in seconds
|
||
|
||
Body (variable — reconstructed from A5 frame data):
|
||
The body bytes are derived from the raw A5 frame wire content, specifically
|
||
from the DLE-decoded representation of each frame's contribution. See the
|
||
_frame_body_bytes() helper for the exact algorithm.
|
||
|
||
Footer (26 bytes):
|
||
0e 08
|
||
[ts1: 8B big-endian timestamp] — start timestamp
|
||
[ts2: 8B big-endian timestamp] — stop timestamp
|
||
00 01 00 02 00 00
|
||
[crc: 2B] — CRC (algorithm unconfirmed; written as 0x00 0x00 placeholder)
|
||
|
||
Timestamp format (big-endian, 8 bytes):
|
||
[day] [month] [year_HI] [year_LO] [0x00] [hour] [min] [sec]
|
||
|
||
MLG (monitor log, confirmed from example-events/4-3-26-multi/BE11529.MLG):
|
||
|
||
[308B header] [N × 292B records]
|
||
|
||
Header (308 bytes):
|
||
Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 — fixed (16B)
|
||
Offset 0x10: ... (unknown structure, written as zeros + serial)
|
||
Offset 0x2A: serial number (8 bytes, null-padded ASCII, e.g. "BE11529")
|
||
... zero-padded to 308 bytes total
|
||
|
||
Record (292 bytes each):
|
||
[2B CRC] — unknown algorithm; written as 0x00 0x00
|
||
22 01 0e 80 — record marker
|
||
[ts1: 8B big-endian timestamp] — start time
|
||
[ts2: 8B big-endian timestamp] — stop time (zeros if no stop)
|
||
[4B flags] — see MLG_FLAGS_* constants below
|
||
[10B serial] — null-padded serial number ASCII
|
||
[text] — for trigger records: [0x08][8B ts1_copy] then ASCII "Geo: X.XXX in/s"
|
||
for monitoring records: b'' (or minimal separator)
|
||
[zero-padded to 292 bytes]
|
||
|
||
─── Critical implementation notes ──────────────────────────────────────────────
|
||
|
||
N00 body reconstruction algorithm (confirmed 2026-04-21 from verification against
|
||
M529LIY6.N00 using raw_s3_20260403_153508.bin capture):
|
||
|
||
The N00 body bytes come from the A5 frame content, stripped of DLE-framing
|
||
artifacts. Each A5 frame contributes a different slice of its data section,
|
||
with DLE+{0x02,0x03,0x04} byte pairs stripped.
|
||
|
||
Skip amounts per frame index (offsets into frame.data):
|
||
A5[0] (probe): data[strt_pos + 21 + 7] (skip header + STRT record)
|
||
strt_pos found by searching frame.data[7:] for b'STRT';
|
||
the contribution starts at strt_pos + 21 within data[7:]
|
||
which equals strt_pos + 21 + 7 within frame.data.
|
||
A5[1]: data[13] (skip 7-byte frame.data prefix + 6 header bytes)
|
||
A5[2..N]: data[12] (skip 7-byte frame.data prefix + 5 header bytes)
|
||
Terminator A5: data[11] (1 byte less than chunk frames; terminator inner header
|
||
is 4 bytes instead of 5 — confirmed 2026-04-21)
|
||
|
||
DLE strip rule (applied AFTER slicing):
|
||
Strip any 0x10 byte that is immediately followed by 0x02, 0x03, or 0x04.
|
||
This undoes the DLE-escape that S3FrameParser preserves as literal pairs.
|
||
Applied to frame.data[skip:] + bytes([frame.chk_byte]) together, then
|
||
conditionally exclude the trailing chk_byte from the output.
|
||
|
||
chk_byte absorption:
|
||
When frame.data[-1] == 0x10 AND frame.chk_byte ∈ {0x02, 0x03, 0x04},
|
||
the last byte of frame.data is the DLE prefix of a split DLE+chk pair.
|
||
Including chk_byte in the strip buffer allows the pair to be stripped as
|
||
a unit. After stripping, the trailing chk_byte is ALWAYS removed — because
|
||
_strip_inner_frame_dles keeps the byte after the DLE (the chk_byte value),
|
||
and that value is the checksum, never payload. This applies to all three
|
||
cases (chk ∈ {0x02, 0x03, 0x04}) identically.
|
||
|
||
MLG CRC:
|
||
The algorithm that produces the 2-byte CRC at the start of each MLG record
|
||
is unknown. All examined records use non-zero values that do not match
|
||
CRC-16/CCITT, CRC-16/IBM, CRC-32 (truncated), word sums, XOR variants, or
|
||
any of the 40+ polynomial/init combinations tested. The writer emits 0x0000.
|
||
This produces files that Blastware may reject or display without the CRC check —
|
||
the exact impact on BW import is unknown (TODO: test).
|
||
|
||
─── Public API ──────────────────────────────────────────────────────────────────
|
||
|
||
blastware_filename(event, serial)
|
||
Return a Blastware-style filename for an event (e.g. "M529LIY6.N00").
|
||
Extension encoding is UNKNOWN — always returns .N00 as a placeholder.
|
||
Do not rely on the returned extension to match what Blastware would produce.
|
||
|
||
write_n00(event, a5_frames, path)
|
||
Create a .N00 or .9T0 waveform file from an Event and the full A5 frame
|
||
list (include_terminator=True required when calling read_bulk_waveform_stream).
|
||
Identical binary format for both extensions — caller picks the path/ext.
|
||
|
||
read_n00(path) → Event
|
||
Parse a .N00 file into an Event object with waveform data populated.
|
||
(Not yet implemented — placeholder raises NotImplementedError.)
|
||
|
||
write_mlg(entries, serial, path)
|
||
Create a .MLG file from a list of MonitorLogEntry objects.
|
||
|
||
read_mlg(path) → list[MonitorLogEntry]
|
||
Parse a .MLG file into MonitorLogEntry objects.
|
||
(Not yet implemented — placeholder raises NotImplementedError.)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import datetime
|
||
import struct
|
||
from pathlib import Path
|
||
from typing import Optional, Union
|
||
|
||
from .framing import S3Frame
|
||
from .models import Event, MonitorLogEntry, Timestamp
|
||
|
||
# ── File header constants ─────────────────────────────────────────────────────
|
||
|
||
# Common 16-byte prefix shared by N00 and MLG (confirmed from binary inspection).
|
||
_FILE_HEADER_PREFIX = bytes.fromhex("1000018000004973") + b"tantel\x00\x07\x2c"
|
||
# = 10 00 01 80 00 00 49 73 74 61 6e 74 65 6c 00 07 2c (17 bytes)
|
||
# Confirmed breakdown: 10 00 01 80 00 00 = fixed; "Instantel\x00" = 10B; 07 2c = fixed
|
||
|
||
# Simpler construction:
|
||
_FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 17 bytes
|
||
|
||
# N00 type tag (4 bytes after common prefix)
|
||
_N00_TYPE_TAG = b"\x00\x12\x03\x00" # confirmed from M529LIY6.N00 offset 0x11..0x14
|
||
|
||
# MLG type tag (4 bytes after common prefix)
|
||
_MLG_TYPE_TAG = b"\x22\x01\x0e\xa0" # confirmed from BE11529.MLG offset 0x11..0x14
|
||
|
||
# Total header sizes
|
||
_N00_HEADER_SIZE = 22 # 17 + 4 = 21... wait. Let me recalculate.
|
||
# From binary: first 22 bytes = header, then STRT at byte 22.
|
||
# 17-byte common prefix + 4-byte type tag = 21 bytes. But observed header is 22B.
|
||
# Checking: 6 fixed + 10 "Instantel\x00" + 2 "07 2c" = 18B prefix, then 4B type tag = 22B.
|
||
# Re-count: b"\x10\x00\x01\x80\x00\x00" = 6B + b"Instantel\x00" = 10B + b"\x07\x2c" = 2B = 18B prefix.
|
||
_FILE_HEADER_PREFIX = b"\x10\x00\x01\x80\x00\x00Instantel\x00\x07\x2c" # 18 bytes
|
||
_N00_HEADER_SIZE = 22 # 18 + 4 = 22 bytes ✅
|
||
_MLG_HEADER_SIZE = 308 # confirmed from BE11529.MLG
|
||
|
||
# MLG record marker (4 bytes after 2-byte CRC at start of each record)
|
||
_MLG_RECORD_MARKER = b"\x22\x01\x0e\x80"
|
||
_MLG_RECORD_SIZE = 292 # bytes per record (confirmed from BE11529.MLG)
|
||
|
||
# MLG record flags (4 bytes at record[22:26])
|
||
# Confirmed from BE11529.MLG binary inspection:
|
||
MLG_FLAGS_START_ONLY = b"\xff\xff\x00\x00" # monitoring start with no stop
|
||
MLG_FLAGS_TRIGGER = b"\x01\x00\x02\x00" # triggered event (has ts1 + ts2)
|
||
MLG_FLAGS_INTERVAL = b"\x02\x00\x00\x00" # monitoring interval (has ts1 + ts2)
|
||
|
||
|
||
# ── Timestamp helpers ─────────────────────────────────────────────────────────
|
||
|
||
def _encode_ts_be(ts: Optional[datetime.datetime]) -> bytes:
|
||
"""
|
||
Encode a datetime as an 8-byte big-endian Blastware timestamp.
|
||
|
||
Format (N00 and MLG record timestamps):
|
||
[day][month][year_HI][year_LO][0x00][hour][min][sec]
|
||
|
||
Big-endian year confirmed from M529LIY6.N00 footer:
|
||
footer bytes [2..9] = 01 04 07 ea 00 00 1c 08
|
||
→ day=1 month=4 year=0x07ea=2026 hour=0 min=28 sec=8 ✅
|
||
|
||
Returns 8 zero bytes if ts is None.
|
||
"""
|
||
if ts is None:
|
||
return bytes(8)
|
||
return bytes([
|
||
ts.day,
|
||
ts.month,
|
||
(ts.year >> 8) & 0xFF,
|
||
ts.year & 0xFF,
|
||
0x00,
|
||
ts.hour,
|
||
ts.minute,
|
||
ts.second,
|
||
])
|
||
|
||
|
||
def _decode_ts_be(raw: bytes) -> Optional[datetime.datetime]:
|
||
"""
|
||
Decode an 8-byte big-endian Blastware timestamp.
|
||
|
||
Returns None if the bytes are all zero or structurally invalid.
|
||
"""
|
||
if len(raw) < 8 or raw == bytes(8):
|
||
return None
|
||
day = raw[0]
|
||
month = raw[1]
|
||
year = (raw[2] << 8) | raw[3]
|
||
hour = raw[5]
|
||
minute = raw[6]
|
||
sec = raw[7]
|
||
try:
|
||
return datetime.datetime(year, month, day, hour, minute, sec)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _ts_from_model(ts: Optional[Timestamp]) -> Optional[datetime.datetime]:
|
||
"""Convert a models.Timestamp to datetime.datetime, or None."""
|
||
if ts is None:
|
||
return None
|
||
try:
|
||
return datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
# ── DLE strip helper ──────────────────────────────────────────────────────────
|
||
|
||
def _strip_inner_frame_dles(data: bytes) -> bytes:
|
||
"""
|
||
Strip DLE (0x10) framing markers from A5 inner-frame content.
|
||
|
||
The A5 (bulk waveform stream) response body contains DLE-encoded sub-frame
|
||
structure. S3FrameParser preserves DLE+XX pairs as two literal bytes in
|
||
frame.data. Only the DLE marker byte needs to be removed; the following
|
||
byte is actual payload content.
|
||
|
||
Rule: when 0x10 is immediately followed by {0x02, 0x03, 0x04}, strip the
|
||
0x10 (DLE marker) and keep the following byte as payload.
|
||
|
||
Lone 0x10 bytes not followed by {0x02, 0x03, 0x04} are kept as-is.
|
||
|
||
Confirmed correct by verifying reconstructed N00 body against M529LIY6.N00:
|
||
- 0x10 0x02 in terminator → 0x02 kept ✓
|
||
- 0x10 0x04 in terminator (month byte) → 0x04 kept ✓
|
||
"""
|
||
out = bytearray()
|
||
i = 0
|
||
while i < len(data):
|
||
b = data[i]
|
||
if b == 0x10 and i + 1 < len(data) and data[i + 1] in {0x02, 0x03, 0x04}:
|
||
# Strip the DLE marker; the next byte is payload and will be appended
|
||
# in the next loop iteration.
|
||
i += 1
|
||
continue
|
||
out.append(b)
|
||
i += 1
|
||
return bytes(out)
|
||
|
||
|
||
def _frame_body_bytes(frame: S3Frame, skip: int) -> bytes:
|
||
"""
|
||
Extract the N00 body contribution from one A5 S3Frame.
|
||
|
||
The contribution is frame.data[skip:] with inner-frame DLE pairs stripped
|
||
per _strip_inner_frame_dles(). The chk_byte is temporarily appended before
|
||
stripping to handle the split-pair edge case where a DLE at the end of
|
||
frame.data is paired with chk_byte.
|
||
|
||
Split-pair edge case (confirmed for A5[8] of M529LIY6.N00, 2026-04-21):
|
||
|
||
S3FrameParser appends DLE+XX pairs as two literal bytes when XX ∉ {DLE, ETX}.
|
||
When the LAST occurrence of such a pair straddles the payload/checksum boundary
|
||
(i.e., DLE is the last byte of raw_payload and XX is the checksum), the parser
|
||
splits them:
|
||
- DLE ends up as the last byte of frame.data (frame.data[-1] == 0x10)
|
||
- XX is stored as frame.chk_byte
|
||
|
||
To strip the pair correctly, we reunite the bytes before calling the strip
|
||
function. Since chk_byte is the checksum (not payload data), it is excluded
|
||
from the final output regardless of whether it was part of a pair.
|
||
|
||
Post-strip chk_byte removal (ALL cases):
|
||
_strip_inner_frame_dles strips the 0x10 and KEEPS chk_byte in all cases.
|
||
Chk_byte is always the checksum (not payload), so always strip it off.
|
||
|
||
Args:
|
||
frame: S3Frame with frame.data and frame.chk_byte populated.
|
||
skip: Number of leading bytes in frame.data to exclude (frame header).
|
||
|
||
Returns:
|
||
bytes — the N00 body contribution for this frame.
|
||
"""
|
||
if skip >= len(frame.data):
|
||
return b""
|
||
|
||
relevant = frame.data[skip:]
|
||
|
||
# Detect split DLE+chk pair at the frame boundary.
|
||
has_split_pair = (
|
||
len(relevant) > 0
|
||
and relevant[-1] == 0x10
|
||
and frame.chk_byte in {0x02, 0x03, 0x04}
|
||
)
|
||
|
||
if has_split_pair:
|
||
# Reunite the split pair so the strip function sees both bytes together.
|
||
buf = relevant + bytes([frame.chk_byte])
|
||
stripped = _strip_inner_frame_dles(buf)
|
||
# _strip_inner_frame_dles strips the DLE (0x10) and KEEPS chk_byte.
|
||
# chk_byte is the received checksum — never payload — so remove it.
|
||
# This is correct for all values in {0x02, 0x03, 0x04}.
|
||
if stripped:
|
||
stripped = stripped[:-1]
|
||
return stripped
|
||
else:
|
||
return _strip_inner_frame_dles(relevant)
|
||
|
||
|
||
# ── Filename helper ───────────────────────────────────────────────────────────
|
||
|
||
_INSTANTEL_EPOCH = datetime.datetime(1985, 1, 1, 0, 0, 0)
|
||
"""
|
||
Instantel timestamp epoch — January 1, 1985, 00:00:00 local time.
|
||
Confirmed 2026-04-21: stem values for 6 independent events (April 1–9, 2026)
|
||
all converge to this epoch when decoded as floor(seconds_since_epoch / 1296).
|
||
1985 is the year Instantel was founded.
|
||
"""
|
||
|
||
_STEM_UNIT_SEC = 1296 # = 36^2 seconds ≈ 21.6 minutes per stem unit
|
||
|
||
_STEM_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||
|
||
# ── Waveform file extension encoding ─────────────────────────────────────────
|
||
#
|
||
# NEW FIRMWARE (V10.72+) — FULLY DECODED (confirmed 2026-04-21, 10-year archive):
|
||
#
|
||
# Extension format: AB0T (4 characters)
|
||
# AB = 2-char base-36 encoding of (seconds_since_epoch % 1296)
|
||
# i.e. the number of seconds into the current 21.6-minute stem window
|
||
# Range: 0 ("00") to 1295 ("ZZ")
|
||
# 0 = always literal '0'
|
||
# T = event type: 'W' = Full Waveform, 'H' = Full Histogram
|
||
#
|
||
# Combined with the 4-char stem (which encodes seconds_since_epoch // 1296),
|
||
# the FULL filename gives a second-resolution timestamp:
|
||
# total_seconds = stem_val * 1296 + ab_val
|
||
# timestamp = EPOCH + timedelta(seconds=total_seconds)
|
||
#
|
||
# Verified against three S353L4H0 events (all three match to the second):
|
||
# S353L4H0.3M0W Full Waveform 2025-06-23 13:57:22 AB=3M=130 ✓
|
||
# S353L4H0.8S0H Full Histogram 2025-06-23 14:00:28 AB=8S=316 ✓
|
||
# S353L4H0.9X0W Full Waveform 2025-06-23 14:01:09 AB=9X=357 ✓
|
||
#
|
||
# OLD FIRMWARE (S338, 3-char extensions ending in '0') — UNKNOWN:
|
||
# Observed: .N00, .9T0, .EI0, .490, .5K0, .980, .ML0
|
||
# The V10.72 formula does NOT apply to these.
|
||
# Extension is NOT recording mode (refuted 2026-04-21: continuous → .EI0, not .9T0).
|
||
# blastware_filename() returns .N00 as a placeholder for old-firmware units.
|
||
#
|
||
# WRONG earlier assumption (do not re-introduce):
|
||
# Extension was believed to encode recording mode × sample rate.
|
||
# Refuted by continuous-mode event producing .EI0 instead of .9T0.
|
||
|
||
|
||
def _make_stem(ts_local: datetime.datetime) -> str:
|
||
"""
|
||
Encode a local timestamp as a 4-character uppercase base-36 stem.
|
||
|
||
Algorithm (confirmed 2026-04-21 from 6 known file/timestamp pairs):
|
||
stem_int = floor((ts_local - Jan_1_1985_midnight_local) / 1296_seconds)
|
||
stem = 4-char uppercase base-36 encoding of stem_int
|
||
|
||
Unit = 36² = 1296 seconds ≈ 21.6 minutes. Events within the same 1296-second
|
||
window receive the same stem; their extension distinguishes them.
|
||
"""
|
||
delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds())
|
||
n = delta_sec // _STEM_UNIT_SEC
|
||
s = ""
|
||
for _ in range(4):
|
||
s = _STEM_CHARS[n % 36] + s
|
||
n //= 36
|
||
return s
|
||
|
||
|
||
def blastware_filename(event: Event, serial: str) -> str:
|
||
"""
|
||
Return a Blastware-style waveform filename for an event.
|
||
|
||
FULLY CONFIRMED 2026-04-22 — verified against 3,248 files from a 10-year
|
||
production archive (zero errors on MiniMate Plus / V10.72 firmware files).
|
||
|
||
Filename format: <prefix_letter><serial3><stem><AB>0<T>
|
||
where:
|
||
|
||
prefix_letter = chr(ord('B') + floor(serial_numeric / 1000))
|
||
— encodes the production generation (batch of 1000 units)
|
||
— e.g. BE6907→H, BE11529→M, BE14036→P, BE18003→T
|
||
|
||
serial3 = f"{serial_numeric % 1000:03d}"
|
||
— last 3 digits of numeric serial, zero-padded
|
||
|
||
stem = 4-char base-36 of floor(total_seconds / 1296)
|
||
— encodes which 21.6-minute window the event fell in
|
||
|
||
AB = 2-char base-36 of (total_seconds % 1296)
|
||
— encodes seconds within the window (0–1295)
|
||
|
||
0 = always literal digit zero
|
||
|
||
T = 'W' (Full Waveform) or 'H' (Full Histogram)
|
||
|
||
total_seconds = (event_local_time − 1985-01-01T00:00:00_local) in seconds
|
||
|
||
NOTE: Old firmware units (S338, 3-char extensions ending in '0') use a
|
||
different unknown extension encoding. This function returns the correct
|
||
extension only for V10.72 / new-firmware MiniMate Plus units. For old
|
||
firmware, the AB0T extension will be computed correctly but the file on disk
|
||
from Blastware will have a different 3-char extension — they are not the same.
|
||
|
||
Micromate Series 4 uses a completely different naming scheme (literal datetime
|
||
in filename); this function does not apply to Micromate units.
|
||
|
||
Args:
|
||
event: Event object with timestamp set.
|
||
serial: Device serial number string (e.g. "BE11529").
|
||
|
||
Returns:
|
||
Filename string (e.g. "M529LIY6.CE0H").
|
||
"""
|
||
# ── Serial prefix ──────────────────────────────────────────────────────────
|
||
serial_digits = "".join(c for c in serial if c.isdigit())
|
||
if len(serial_digits) >= 1:
|
||
serial_numeric = int(serial_digits)
|
||
generation = serial_numeric // 1000
|
||
prefix_letter = chr(ord('B') + generation)
|
||
serial3 = f"{serial_numeric % 1000:03d}"
|
||
else:
|
||
prefix_letter = "M" # fallback
|
||
serial3 = "000"
|
||
prefix = prefix_letter + serial3
|
||
|
||
# ── Stem + AB extension from timestamp ────────────────────────────────────
|
||
if event.timestamp is not None:
|
||
try:
|
||
ts_local = datetime.datetime(
|
||
event.timestamp.year, event.timestamp.month, event.timestamp.day,
|
||
event.timestamp.hour, event.timestamp.minute, event.timestamp.second,
|
||
)
|
||
delta_sec = int((ts_local - _INSTANTEL_EPOCH).total_seconds())
|
||
stem = _make_stem(ts_local)
|
||
ab_val = delta_sec % _STEM_UNIT_SEC # 0–1295
|
||
ab_str = _STEM_CHARS[ab_val // 36] + _STEM_CHARS[ab_val % 36]
|
||
except (ValueError, TypeError, AttributeError):
|
||
stem = "0000"
|
||
ab_str = "00"
|
||
else:
|
||
stem = "0000"
|
||
ab_str = "00"
|
||
|
||
# ── Event type character ──────────────────────────────────────────────────
|
||
# H = Full Histogram, W = Full Waveform
|
||
# record_type is set from the 0A header byte: 0x46=triggered, 0x2C=monitor log
|
||
# Histogram vs waveform distinction comes from the compliance recording_mode.
|
||
# Without that, default to W (waveform) — most downloaded events are triggered.
|
||
if getattr(event, 'recording_mode', None) in (3, 4): # Histogram / Hist+Cont
|
||
type_char = 'H'
|
||
else:
|
||
type_char = 'W'
|
||
|
||
ext = f".{ab_str}0{type_char}"
|
||
return prefix + stem + ext
|
||
|
||
|
||
# ── N00 file writer ───────────────────────────────────────────────────────────
|
||
|
||
def write_n00(
|
||
event: Event,
|
||
a5_frames: list[S3Frame],
|
||
path: Union[str, Path],
|
||
) -> None:
|
||
"""
|
||
Write a Blastware .N00 waveform file from a downloaded event.
|
||
|
||
Args:
|
||
event: Event object (populated by get_events() or download_waveform()).
|
||
Used for the STRT record (key, rectime) and footer timestamps.
|
||
a5_frames: Complete A5 frame list INCLUDING the terminator frame
|
||
(page_key=0x0000). Pass include_terminator=True to
|
||
read_bulk_waveform_stream() when collecting frames.
|
||
Must have at least 2 frames (probe + terminator).
|
||
path: Destination file path. Parent directory must exist.
|
||
Extension is not enforced — caller should use ".N00".
|
||
|
||
File layout:
|
||
[22B header] [21B STRT] [body bytes] [26B footer]
|
||
|
||
Raises:
|
||
ValueError: if a5_frames is empty or has no terminator (page_key=0).
|
||
OSError: if the file cannot be written.
|
||
|
||
Confirmed correct N00 body reconstruction against M529LIY6.N00 (2026-04-21).
|
||
"""
|
||
if not a5_frames:
|
||
raise ValueError("a5_frames must not be empty")
|
||
|
||
path = Path(path)
|
||
|
||
# ── Extract STRT record from probe frame ────────────────────────────────
|
||
# The STRT record (21 bytes) lives verbatim inside A5[0].data[7:].
|
||
# It is stored as-is in the N00 file — do NOT reconstruct it from Event
|
||
# fields, as bytes [10:14] and [14:20] contain device-specific values
|
||
# (not simply key4 repeated or zero-padded). Confirmed 2026-04-21.
|
||
#
|
||
# STRT layout (21 bytes, observed in M529LIY6.N00):
|
||
# [0:4] b'STRT'
|
||
# [4:6] 0xff 0xfe (fixed)
|
||
# [6:10] key4 (event key)
|
||
# [10:14] device-specific field (NOT a key4 repeat)
|
||
# [14:20] device-specific fields (NOT zeros)
|
||
# [20] rectime uint8 seconds
|
||
# Extract STRT from the DLE-stripped probe frame.
|
||
#
|
||
# frame.data[7:] is the raw wire representation; it may contain DLE+{02,03,04}
|
||
# inner-frame pairs that S3FrameParser preserves as two literal bytes. The
|
||
# Blastware file stores the stripped form, so we must strip before extracting.
|
||
#
|
||
# Example (M529LK0Y, 2026-04-21): STRT contains value 0x02 encoded as [10 02]
|
||
# on the wire. Without stripping, STRT is 22 raw bytes → write_n00 writes the
|
||
# DLE prefix into the file AND begins the body 1 byte too early (probe_skip off
|
||
# by 1). Stripping fixes both.
|
||
#
|
||
# probe_skip must be computed in the RAW frame.data domain (it is used as the
|
||
# `skip` argument to _frame_body_bytes which operates on raw frame.data).
|
||
# We walk the raw bytes counting stripped bytes until we have passed
|
||
# strt_pos + 21 stripped bytes, giving the raw offset of the first body byte.
|
||
w0_raw = bytes(a5_frames[0].data[7:])
|
||
w0_stripped = _strip_inner_frame_dles(w0_raw)
|
||
strt_pos_stripped = w0_stripped.find(b"STRT")
|
||
|
||
if strt_pos_stripped >= 0:
|
||
strt = bytes(w0_stripped[strt_pos_stripped : strt_pos_stripped + 21])
|
||
|
||
# Walk raw bytes to find the raw-domain end of the STRT (= body start).
|
||
target_stripped = strt_pos_stripped + 21
|
||
stripped_so_far = 0
|
||
raw_i = 0
|
||
while stripped_so_far < target_stripped and raw_i < len(w0_raw):
|
||
if (w0_raw[raw_i] == 0x10
|
||
and raw_i + 1 < len(w0_raw)
|
||
and w0_raw[raw_i + 1] in {0x02, 0x03, 0x04}):
|
||
raw_i += 2 # DLE pair → 1 stripped byte, 2 raw bytes
|
||
else:
|
||
raw_i += 1 # normal byte → 1 stripped byte, 1 raw byte
|
||
stripped_so_far += 1
|
||
probe_skip = 7 + raw_i # raw bytes to skip: 7 header + raw STRT length
|
||
else:
|
||
# Fallback: construct a minimal STRT if probe frame lacks it
|
||
key4 = event._waveform_key if hasattr(event, '_waveform_key') and event._waveform_key else bytes(4)
|
||
rectime = event.rectime_seconds if event.rectime_seconds is not None else 0
|
||
strt = b"STRT" + b"\xff\xfe" + key4 + bytes(14) + bytes([rectime & 0xFF])
|
||
probe_skip = 7 + 21
|
||
|
||
if len(strt) != 21:
|
||
raise ValueError(f"STRT record must be 21 bytes, got {len(strt)}")
|
||
|
||
# ── Build N00 header ─────────────────────────────────────────────────────
|
||
header = _FILE_HEADER_PREFIX + _N00_TYPE_TAG
|
||
assert len(header) == _N00_HEADER_SIZE, f"N00 header must be {_N00_HEADER_SIZE} bytes"
|
||
|
||
# ── Build body from A5 frames ────────────────────────────────────────────
|
||
# The N00 body is reconstructed from ALL A5 frames (data + terminator).
|
||
# The terminator frame's contribution includes the 26-byte footer at its end.
|
||
#
|
||
# Reconstruction layout (confirmed from M529LIY6.N00, 2026-04-21):
|
||
# all_bytes = contributions from A5[0..N] + terminator_contribution
|
||
# body = all_bytes[:-26] (everything except the last 26 bytes)
|
||
# footer = all_bytes[-26:] (last 26 bytes = the N00 footer)
|
||
#
|
||
# The footer bytes come directly from the terminator frame's inner content —
|
||
# using them verbatim ensures timestamps match the device's recorded values.
|
||
|
||
# Separate terminator from data frames
|
||
body_frames = a5_frames
|
||
term_frame: Optional[S3Frame] = None
|
||
if a5_frames and a5_frames[-1].page_key == 0x0000:
|
||
body_frames = a5_frames[:-1]
|
||
term_frame = a5_frames[-1]
|
||
|
||
all_bytes = bytearray()
|
||
|
||
for fi, frame in enumerate(body_frames):
|
||
if fi == 0:
|
||
skip = probe_skip
|
||
elif fi == 1:
|
||
skip = 13 # 7-byte frame.data prefix + 6-byte first-chunk header
|
||
else:
|
||
skip = 12 # 7-byte frame.data prefix + 5-byte chunk header
|
||
all_bytes.extend(_frame_body_bytes(frame, skip))
|
||
|
||
# Terminator contributes its content, which ends with the 26-byte footer.
|
||
# skip=11 (not 12) because the terminator's inner frame header is 4 bytes,
|
||
# one shorter than chunk frames' 5-byte inner header. Confirmed 2026-04-21.
|
||
if term_frame is not None:
|
||
all_bytes.extend(_frame_body_bytes(term_frame, 11))
|
||
|
||
if len(all_bytes) >= 26:
|
||
body = bytes(all_bytes[:-26])
|
||
footer = bytes(all_bytes[-26:])
|
||
else:
|
||
# Fallback: no terminator or very short stream → build footer from event metadata
|
||
body = bytes(all_bytes)
|
||
start_dt = _ts_from_model(event.timestamp)
|
||
stop_dt: Optional[datetime.datetime] = None
|
||
if start_dt is not None and event.rectime_seconds:
|
||
stop_dt = start_dt + datetime.timedelta(seconds=event.rectime_seconds)
|
||
footer = (
|
||
b"\x0e\x08"
|
||
+ _encode_ts_be(start_dt)
|
||
+ _encode_ts_be(stop_dt)
|
||
+ b"\x00\x01\x00\x02\x00\x00"
|
||
+ b"\x00\x00" # CRC placeholder
|
||
)
|
||
|
||
# ── Write file ───────────────────────────────────────────────────────────
|
||
with open(path, "wb") as f:
|
||
f.write(header)
|
||
f.write(strt)
|
||
f.write(body)
|
||
f.write(footer)
|
||
|
||
|
||
def read_n00(path: Union[str, Path]) -> Event:
|
||
"""
|
||
Parse a Blastware .N00 file into an Event object.
|
||
|
||
NOT YET IMPLEMENTED.
|
||
|
||
Args:
|
||
path: Path to the .N00 file.
|
||
|
||
Returns:
|
||
Event object with waveform data populated.
|
||
|
||
Raises:
|
||
NotImplementedError: always (pending implementation).
|
||
"""
|
||
raise NotImplementedError("read_n00() is not yet implemented")
|
||
|
||
|
||
# ── MLG file writer ───────────────────────────────────────────────────────────
|
||
|
||
def _build_mlg_header(serial: str) -> bytes:
|
||
"""
|
||
Build the 308-byte MLG file header.
|
||
|
||
Header structure (confirmed from BE11529.MLG binary inspection):
|
||
Offset 0x00: 10 00 01 80 00 00 Instantel\x00 07 2c 22 01 0e a0 (22B)
|
||
Offset 0x16: ... (16B unknown — observed as zeros in BE11529.MLG)
|
||
Offset 0x2A: serial number (8 bytes, null-padded ASCII)
|
||
... rest zero-padded to 308 bytes
|
||
|
||
The serial string "BE11529" appears at offset 0x2A (42 decimal).
|
||
"""
|
||
buf = bytearray(_MLG_HEADER_SIZE)
|
||
|
||
# Common prefix + MLG type tag
|
||
prefix = _FILE_HEADER_PREFIX + _MLG_TYPE_TAG # 22 bytes
|
||
buf[0:len(prefix)] = prefix
|
||
|
||
# Serial number at offset 0x2A
|
||
serial_bytes = serial.encode("ascii", errors="replace")[:8]
|
||
serial_padded = serial_bytes.ljust(8, b"\x00")
|
||
buf[0x2A : 0x2A + 8] = serial_padded
|
||
|
||
return bytes(buf)
|
||
|
||
|
||
def _build_mlg_record(
|
||
entry: MonitorLogEntry,
|
||
serial: str,
|
||
) -> bytes:
|
||
"""
|
||
Build one 292-byte MLG record from a MonitorLogEntry.
|
||
|
||
Record layout (confirmed from BE11529.MLG binary inspection):
|
||
[0:2] CRC — 2-byte CRC (algorithm unknown; written as 0x0000)
|
||
[2:6] marker — 22 01 0e 80
|
||
[6:14] ts1 — 8B big-endian start timestamp
|
||
[14:22] ts2 — 8B big-endian stop timestamp
|
||
[22:26] flags — 4B record flags (see MLG_FLAGS_* constants)
|
||
[26:36] serial — 10B null-padded serial number
|
||
[36:] text — for triggered events: [0x08][8B ts1_copy]["Geo: X.XXX in/s"]
|
||
for monitoring intervals: b"" or minimal separator
|
||
[... zero-padded to 292 bytes]
|
||
|
||
Flags based on entry type:
|
||
- MonitorLogEntry with start_time only (no stop_time): MLG_FLAGS_START_ONLY
|
||
- MonitorLogEntry with both times and geo_threshold_ips set: MLG_FLAGS_TRIGGER
|
||
- MonitorLogEntry with both times (monitoring interval): MLG_FLAGS_INTERVAL
|
||
|
||
The triggered-event text block (flags = MLG_FLAGS_TRIGGER):
|
||
[0x08] [ts1: 8B] [ASCII "Geo: X.XXX in/s\x00"]
|
||
Confirmed from BE11529.MLG records at offset 0x0134 and 0x0258.
|
||
"""
|
||
buf = bytearray(_MLG_RECORD_SIZE)
|
||
|
||
start_dt = (
|
||
datetime.datetime(
|
||
entry.start_time.year, entry.start_time.month, entry.start_time.day,
|
||
entry.start_time.hour, entry.start_time.minute, entry.start_time.second,
|
||
)
|
||
if entry.start_time else None
|
||
)
|
||
stop_dt = (
|
||
datetime.datetime(
|
||
entry.stop_time.year, entry.stop_time.month, entry.stop_time.day,
|
||
entry.stop_time.hour, entry.stop_time.minute, entry.stop_time.second,
|
||
)
|
||
if entry.stop_time else None
|
||
)
|
||
|
||
# [0:2] CRC placeholder
|
||
buf[0:2] = b"\x00\x00"
|
||
|
||
# [2:6] Record marker
|
||
buf[2:6] = _MLG_RECORD_MARKER
|
||
|
||
# [6:14] ts1
|
||
buf[6:14] = _encode_ts_be(start_dt)
|
||
|
||
# [14:22] ts2
|
||
buf[14:22] = _encode_ts_be(stop_dt)
|
||
|
||
# [22:26] flags
|
||
if stop_dt is None:
|
||
flags = MLG_FLAGS_START_ONLY
|
||
elif entry.geo_threshold_ips is not None:
|
||
flags = MLG_FLAGS_TRIGGER
|
||
else:
|
||
flags = MLG_FLAGS_INTERVAL
|
||
buf[22:26] = flags
|
||
|
||
# [26:36] serial (10B null-padded)
|
||
serial_bytes = serial.encode("ascii", errors="replace")[:10]
|
||
buf[26 : 26 + len(serial_bytes)] = serial_bytes
|
||
|
||
# [36:] text content
|
||
pos = 36
|
||
if flags == MLG_FLAGS_TRIGGER:
|
||
# Extra ts1 copy: [0x08][ts1: 8B]
|
||
buf[pos] = 0x08
|
||
pos += 1
|
||
buf[pos : pos + 8] = _encode_ts_be(start_dt)
|
||
pos += 8
|
||
|
||
if entry.geo_threshold_ips is not None:
|
||
geo_text = f"Geo: {entry.geo_threshold_ips:.3f} in/s\x00".encode("ascii")
|
||
buf[pos : pos + len(geo_text)] = geo_text
|
||
pos += len(geo_text)
|
||
|
||
return bytes(buf)
|
||
|
||
|
||
def write_mlg(
|
||
entries: list[MonitorLogEntry],
|
||
serial: str,
|
||
path: Union[str, Path],
|
||
) -> None:
|
||
"""
|
||
Write a Blastware .MLG monitor log file.
|
||
|
||
Args:
|
||
entries: List of MonitorLogEntry objects (from get_monitor_log_entries()).
|
||
Each entry produces one 292-byte record in the file.
|
||
serial: Device serial number string (e.g. "BE11529").
|
||
Written to the file header and each record.
|
||
path: Destination file path. Extension is not enforced — use ".MLG".
|
||
|
||
File layout:
|
||
[308B header] [N × 292B records]
|
||
|
||
Note: The 2-byte CRC at the start of each record is written as 0x0000.
|
||
The CRC algorithm is unknown (see module docstring).
|
||
|
||
Raises:
|
||
OSError: if the file cannot be written.
|
||
"""
|
||
path = Path(path)
|
||
header = _build_mlg_header(serial)
|
||
|
||
with open(path, "wb") as f:
|
||
f.write(header)
|
||
for entry in entries:
|
||
record = _build_mlg_record(entry, serial)
|
||
f.write(record)
|
||
|
||
|
||
def read_mlg(path: Union[str, Path]) -> list[MonitorLogEntry]:
|
||
"""
|
||
Parse a Blastware .MLG file into a list of MonitorLogEntry objects.
|
||
|
||
NOT YET IMPLEMENTED.
|
||
|
||
Args:
|
||
path: Path to the .MLG file.
|
||
|
||
Returns:
|
||
List of MonitorLogEntry objects.
|
||
|
||
Raises:
|
||
NotImplementedError: always (pending implementation).
|
||
"""
|
||
raise NotImplementedError("read_mlg() is not yet implemented")
|