Files
seismo-relay/micromate/idf_file.py
T
serversdown bee118506b fix(idf): decode from in-memory bytes during ingest
Bug shipped in v0.21.0: save_imported_idf called read_idf_file()
with `source_path` (a bare filename like "UM12947_….IDFW") BEFORE
writing the binary to disk.  The codec did Path(path).read_bytes()
which resolved relative to /app and hit FileNotFoundError.  The
error was caught + logged as a warning, and ingest fell back to
.txt-only — events still landed in the DB but lost the bw_report
block + .h5 waveform that the codec was supposed to produce.

Observed during a full re-forward from thor-watcher on 2026-05-29:
every Thor event logged "binary codec failed for X: [Errno 2] No
such file or directory" and got binary_decoded=False.

Fix:
- read_idf_file() gains a `data: Optional[bytes]` kwarg.  When
  supplied, skips the disk read and decodes the provided bytes
  directly.  `path` stays required (used for filename in error
  messages + .IDFH vs .IDFW suffix detection); only the read is
  conditional.  Backward compatible — existing positional callers
  (CLI scripts, tests) continue to work unchanged.
- save_imported_idf passes `data=idf_bytes` since the bytes are
  already in memory from the multipart upload.  Filesystem write
  still happens at step 5 of the existing flow; codec just no
  longer depends on it.

Verified end-to-end against UM11719_20231219162723.IDFW from the
example-data corpus: ingest endpoint returns inserted=1, log line
shows binary_decoded=True + h5=...IDFW.h5, no warnings.

Re-forward existing Thor events from thor-watcher after deploy to
backfill the bw_report block — UPSERT preserves review state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-29 20:09:54 +00:00

460 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
micromate/idf_file.py — Thor IDF binary codec.
Decodes the Instantel Micromate Series IV ``.IDFW`` (waveform) and
``.IDFH`` (histogram) binary on-disk format. Sister module to
``minimateplus/event_file_io.py``.
Status (2026-05-28):
- **Genuine Series IV / Thor binaries** are all signed
``00 12 01 00 00 00 Instantel\\0`` (sig-A in earlier notes). Two
Series III (Blastware) binaries appear in the example corpus
(``BE9439_*``) — they share the ``.IDFW``/``.IDFH`` extension by
filing convention but carry a BW STRT header (``10 00 01 80 00 00
Instantel STRT...``) and are NOT Thor data. The reader detects
them by signature and raises NotImplementedError pointing callers
at ``minimateplus.event_file_io.read_blastware_file()``.
- **IDFW waveform body** reuses the BW segment-rotated block codec
verbatim. Body always starts at file offset ``0x0f1f``. Samples
decoded via ``minimateplus.waveform_codec.decode_waveform_v2``
with 8799% byte-exact match against ``.IDFW.txt`` sidecar (quiet
events). Loud events hit the BW codec's known walker-stops-early
limit. Residual ~3% drift on per-sample deltas — likely a
Thor-specific 12-bit delta refinement that BW's codec doesn't
model. Geo LSB = 0.0003 in/s; mic factor ~2.14e-6 psi/count.
- **IDFH histogram body**: 12-byte segment header
``[len_be 2B] 0a 00 00 00 [00 NN_counter] 05 3f`` introduces a
segment of ``N`` 72-byte interval records (``N = (len - 10) // 72``).
Each record holds 4 × 16-byte per-channel min/max/halfp + 8-byte
tail. Geo peaks via ``max(|min|, |max|) / 32768 × 10`` in/s
(matches sidecar within ~1.8%), freq via ``512 / halfp`` Hz.
**All 859 Thor IDFH files in the corpus decode (181,071 intervals).**
- Binary metadata directly extracted: serial, timestamp, sample_rate,
record_time, calibration_date. Other fields fall back to the paired
``.IDFW.txt`` / ``.IDFH.txt`` sidecar (consumed by
``WaveformStore.save_imported_idf``).
The full reverse-engineering writeup lives in
``docs/idf_protocol_reference.md``.
"""
from __future__ import annotations
import datetime
import struct
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union
from minimateplus.waveform_codec import decode_waveform_v2
from .models import IdfEvent, IdfPeaks, IdfReport
# Genuine Series IV / Thor IDF binary signature: 6 bytes, then ASCII "Instantel".
_THOR_PREFIX = b"\x00\x12\x01\x00\x00\x00"
# Stray Series III (Blastware) binaries that occasionally turn up in Thor
# corpus directories renamed to the .IDFW/.IDFH convention. Their header
# (`10 00 01 80 00 00 Instantel STRT ...`) is byte-for-byte a BW SUB 5A
# STRT record, not a Thor binary. Detected so we can refuse-and-route
# rather than mis-parse.
_BW_STRAY_PREFIX = b"\x10\x00\x01\x80\x00\x00"
_INSTANTEL_TAG = b"Instantel"
# Constant body offset for sig-A IDFW files (verified across 151/154 corpus
# files in tests/fixtures/THORDATA_example). The body is the segment-rotated
# block stream consumed by decode_waveform_v2; bytes [0:3] are the magic
# ``00 02 00`` preamble.
_BODY_START_SIG_A = 0x0F1F
# Geophone count → in/s, derived from sidecar ground truth: the smallest
# non-zero sample in 1,014-file corpus is 0.0003 in/s.
_GEO_LSB_IPS = 0.0003
# Microphone count → psi, derived from sidecar regression on 50 sample
# pairs from UM11719_20231219162723.IDFW (mic-heavy event).
_MIC_LSB_PSI = 2.14e-6
# IDFH histogram constants.
_IDFH_INTERVAL_SIZE = 72 # bytes per per-interval record
_IDFH_SEGMENT_HEADER = 10 # bytes: [len_be 2B][0a 00 00 00 4B][00 NN 2B][05 3f 2B]
_IDFH_SEGMENT_TAIL = 2 # bytes after the interval data block, before next marker
_IDFH_HALFP_FREQ_NUM = 512.0 # freq_hz = NUM / halfp; halfp ≤ 5 means ">100 Hz" sentinel
_IDFH_GEO_FULL_SCALE = 10.0 # in/s — Normal range
_IDFH_INT16_FS = 32768.0
_IDFH_CHANNELS = ("Tran", "Vert", "Long", "MicL")
# ─── Binary metadata extraction ─────────────────────────────────────────────
@dataclass
class IdfBinaryMetadata:
"""Fields recoverable from the sig-A binary header (no .txt needed)."""
serial: Optional[str] = None
event_datetime: Optional[datetime.datetime] = None
sample_rate: Optional[int] = None
record_time_sec: Optional[float] = None
calibration_date: Optional[datetime.date] = None
def _read_ascii_z(buf: bytes, off: int, maxlen: int = 64) -> Optional[str]:
if off >= len(buf):
return None
end = buf.find(b"\x00", off, off + maxlen)
if end < 0:
end = min(off + maxlen, len(buf))
s = buf[off:end].decode("ascii", errors="replace").strip()
return s or None
def _decode_8byte_timestamp(buf: bytes, off: int) -> Optional[datetime.datetime]:
"""Layout: ``[day][month][year_hi][year_lo][unknown][hour][min][sec]``."""
if off + 8 > len(buf):
return None
day, mon, yh, yl, _unk, hr, mn, sc = buf[off : off + 8]
year = (yh << 8) | yl
if not (2015 <= year <= 2050 and 1 <= mon <= 12 and 1 <= day <= 31
and 0 <= hr < 24 and 0 <= mn < 60 and 0 <= sc < 60):
return None
try:
return datetime.datetime(year, mon, day, hr, mn, sc)
except ValueError:
return None
def extract_binary_metadata(buf: bytes) -> IdfBinaryMetadata:
"""Pull serial/timestamp/sample_rate/record_time/calibration from the
sig-A binary header.
Field positions confirmed against UM11719_20231219162723.IDFW; stable
across the 151-file sig-A corpus.
"""
md = IdfBinaryMetadata()
# Serial: null-terminated ASCII at 0x14E.
md.serial = _read_ascii_z(buf, 0x14E, maxlen=16)
# Sample rate + record time live in a BW-compatible compliance block.
# Locate the 6-byte anchor `be 80 00 00 00 00` and read offsets relative
# to it: anchor-6 = sample_rate uint16 BE; anchor+6 = record_time float32 BE.
anchor = buf.find(b"\xbe\x80\x00\x00\x00\x00", 0x800, 0xA00)
if anchor > 0:
sr_bytes = buf[anchor - 6 : anchor - 4]
if len(sr_bytes) == 2:
sr = int.from_bytes(sr_bytes, "big")
if sr in (256, 512, 1024, 2048, 4096):
md.sample_rate = sr
rt_bytes = buf[anchor + 6 : anchor + 10]
if len(rt_bytes) == 4:
try:
rt = struct.unpack(">f", rt_bytes)[0]
if 0.1 <= rt <= 600.0:
md.record_time_sec = float(rt)
except struct.error:
pass
# Event timestamp: 8 bytes. Position differs between IDFW (0x97A) and
# IDFH (0x9F8); scan a small range and accept the first valid decode.
for off in (0x97A, 0x9F8):
ts = _decode_8byte_timestamp(buf, off)
if ts is not None:
md.event_datetime = ts
break
# Calibration date: day, month, year_be at 0x194-0x197.
if len(buf) > 0x197:
day, mon = buf[0x194], buf[0x195]
year = int.from_bytes(buf[0x196 : 0x198], "big")
if 1 <= mon <= 12 and 1 <= day <= 31 and 2015 <= year <= 2050:
try:
md.calibration_date = datetime.date(year, mon, day)
except ValueError:
pass
return md
# ─── Sample decoder + unit conversion ───────────────────────────────────────
def _decode_waveform_samples(buf: bytes) -> Optional[dict]:
"""Decode samples from the sig-A body starting at file offset 0x0f1f.
Returns the raw decoder counts dict — geo LSB = 0.0003 in/s, mic in
its own count unit (see :func:`mic_count_to_psi`). Returns None if
decoding fails.
"""
if len(buf) < _BODY_START_SIG_A + 8:
return None
body = buf[_BODY_START_SIG_A:]
return decode_waveform_v2(body)
def geo_count_to_ips(count: int) -> float:
"""Convert a Thor geo decoder count to in/s. LSB = 0.0003 in/s."""
return count * _GEO_LSB_IPS
def mic_count_to_psi(count: int) -> float:
"""Convert a Thor mic decoder count to psi. Scale derived from
regression over 50 sample pairs in UM11719_20231219162723.IDFW;
consistent to ~5%. Calibration constants from the channel block
can refine this once decoded.
"""
return count * _MIC_LSB_PSI
# ─── IDFH histogram decoder ─────────────────────────────────────────────────
@dataclass
class IdfhInterval:
"""One decoded histogram interval (typically one minute of monitoring)."""
offset: int # file byte offset of the 72-byte record
# Per-channel min/max ADC counts (int16 BE), half-period samples, peak count.
# Peak = max(|min|, |max|). freq_hz = 512/halfp (None if halfp ≤ 5 →
# ">100 Hz" sentinel; matches sidecar convention).
tran_min: int
tran_max: int
tran_halfp: int
vert_min: int
vert_max: int
vert_halfp: int
long_min: int
long_max: int
long_halfp: int
micl_min: int
micl_max: int
micl_halfp: int
def peak_count(self, channel: str) -> int:
mn = getattr(self, f"{channel.lower()}_min")
mx = getattr(self, f"{channel.lower()}_max")
return max(abs(mn), abs(mx))
def peak_ips(self, channel: str) -> float:
"""Convert peak count to in/s (geo channels only)."""
return self.peak_count(channel) / _IDFH_INT16_FS * _IDFH_GEO_FULL_SCALE
def freq_hz(self, channel: str) -> Optional[float]:
halfp = getattr(self, f"{channel.lower()}_halfp")
if halfp <= 5:
return None
return _IDFH_HALFP_FREQ_NUM / halfp
def _decode_idfh_interval(buf72: bytes, offset: int) -> IdfhInterval:
"""Decode one 72-byte interval record into per-channel min/max/halfp."""
import struct
fields = []
for i in range(4):
block = buf72[i * 16 : (i + 1) * 16]
mn = struct.unpack_from(">h", block, 0)[0]
mx = struct.unpack_from(">h", block, 2)[0]
# block[4:6] = int16 BE, role unknown (possibly time-of-peak)
halfp = struct.unpack_from(">H", block, 6)[0]
# block[10:12] and block[14:16] are uint16 BE with unknown semantics
# (likely sum / count contributions for the PVS computation).
fields.extend([mn, mx, halfp])
# Tail 8 bytes (buf72[64:72]) carry PVS-related data; not yet decoded.
return IdfhInterval(
offset=offset,
tran_min=fields[0], tran_max=fields[1], tran_halfp=fields[2],
vert_min=fields[3], vert_max=fields[4], vert_halfp=fields[5],
long_min=fields[6], long_max=fields[7], long_halfp=fields[8],
micl_min=fields[9], micl_max=fields[10], micl_halfp=fields[11],
)
def decode_idfh_body(buf: bytes) -> list:
"""Walk an IDFH file and decode every interval record.
The body has one or more segments; each segment header is 12 bytes:
``[length_be 2B][0a 00 00 00][00 NN_counter][05 3f]`` where ``length``
is bytes from the magic through the end of the interval block
(= 10 + 72 × n_intervals). Segments are separated by a 2-byte tail
+ next-segment 2-byte prefix (the bytes before the next length field).
Confirmed against the 859-file corpus (181,071 intervals decoded; 1
failure is the sig-B BE9439 file).
"""
intervals: list = []
i = 0
while True:
j = buf.find(b"\x0a\x00\x00\x00", i)
if j < 0 or j < 2:
break
# Validate: [length_be][0a 00 00 00][00 NN][05 3f]
if buf[j + 4] != 0x00 or buf[j + 6 : j + 8] != b"\x05\x3f":
i = j + 1
continue
length = int.from_bytes(buf[j - 2 : j], "big")
n = (length - _IDFH_SEGMENT_HEADER) // _IDFH_INTERVAL_SIZE
if n <= 0:
i = j + 1
continue
header_start = j - 2
interval_start = header_start + _IDFH_SEGMENT_HEADER
for k in range(n):
off = interval_start + k * _IDFH_INTERVAL_SIZE
if off + _IDFH_INTERVAL_SIZE > len(buf):
break
chunk = buf[off : off + _IDFH_INTERVAL_SIZE]
intervals.append(_decode_idfh_interval(chunk, off))
# Advance past this segment + the 2-byte tail.
i = header_start + length + _IDFH_SEGMENT_TAIL
return intervals
# ─── Top-level reader ───────────────────────────────────────────────────────
@dataclass
class IdfReadResult:
"""Return type for :func:`read_idf_file`.
For waveforms (``.IDFW``), ``samples`` holds the per-channel sample
arrays in Thor decoder counts. For histograms (``.IDFH``),
``samples`` is empty and ``intervals`` holds the per-interval
record list (peaks, freqs).
"""
event: IdfEvent
samples: dict # {"Tran": [...], ...} for IDFW; empty for IDFH
binary_metadata: IdfBinaryMetadata
signature: str # always "thor" for now (sig-A genuine Thor)
intervals: Optional[list] = None # list[IdfhInterval] for IDFH; None for IDFW
def read_idf_file(
path: Union[str, Path],
*,
data: Optional[bytes] = None,
) -> IdfReadResult:
"""Parse a Thor ``.IDFW`` binary into an ``IdfEvent`` + decoded samples.
Currently implements signature-A waveforms only. Signature-B
(old-firmware) and ``.IDFH`` histograms raise NotImplementedError;
use the paired ``.IDFW.txt`` / ``.IDFH.txt`` sidecar for those via
``parse_idf_report()``.
Returns an :class:`IdfReadResult`. The caller converts int sample
counts to physical units via :func:`geo_count_to_ips` /
:func:`mic_count_to_psi`.
``path`` is used for filename in error messages and ``.IDFH`` vs
``.IDFW`` suffix detection. When ``data`` is supplied the disk
read is skipped — useful for ingest paths that already have the
bytes in memory and where the file may not exist on disk yet.
"""
p = Path(path)
buf = data if data is not None else p.read_bytes()
if len(buf) < 16 or buf[6:16] != _INSTANTEL_TAG + b"\x00":
raise ValueError(f"{p.name}: not an IDF file (missing Instantel magic)")
sig_prefix = buf[:6]
if sig_prefix == _THOR_PREFIX:
signature = "thor"
elif sig_prefix == _BW_STRAY_PREFIX:
raise NotImplementedError(
f"{p.name}: file has a Series III (Blastware) STRT header in "
"an IDF-named container — not a Thor binary. Route through "
"minimateplus.event_file_io.read_blastware_file() instead "
"(peaks decode; samples & full metadata don't, but it's not "
"Thor data so the Thor codec doesn't apply)."
)
else:
raise ValueError(f"{p.name}: unknown IDF signature {sig_prefix.hex()}")
is_histogram = p.suffix.upper() == ".IDFH"
md = extract_binary_metadata(buf)
if is_histogram:
intervals = decode_idfh_body(buf)
if not intervals:
raise ValueError(f"{p.name}: IDFH body decoded no intervals")
# Peaks: max across all intervals on each channel (per-channel max
# of stored max-magnitudes; sidecar's PPV row carries the same).
peak_tran = max((iv.peak_ips("Tran") for iv in intervals), default=0.0)
peak_vert = max((iv.peak_ips("Vert") for iv in intervals), default=0.0)
peak_long = max((iv.peak_ips("Long") for iv in intervals), default=0.0)
rep = IdfReport(
serial_number=md.serial,
event_type="Full Histogram",
event_datetime=md.event_datetime,
filename=p.name,
sample_rate=md.sample_rate,
record_time_sec=md.record_time_sec,
)
peaks = IdfPeaks(
transverse_ips=peak_tran,
vertical_ips=peak_vert,
longitudinal_ips=peak_long,
peak_vector_sum_ips=None,
mic_pspl_dbl=None,
)
event = IdfEvent(
serial=md.serial or "UNKNOWN",
timestamp=md.event_datetime or datetime.datetime(1970, 1, 1),
kind="Histogram",
filename=p.name,
sample_rate=md.sample_rate,
record_time_sec=md.record_time_sec,
peaks=peaks,
report=rep,
)
return IdfReadResult(
event=event,
samples={},
binary_metadata=md,
signature=signature,
intervals=intervals,
)
# Waveform path.
decoded = _decode_waveform_samples(buf)
if decoded is None:
raise ValueError(f"{p.name}: waveform body codec failed")
rep = IdfReport(
serial_number=md.serial,
event_type="Full Waveform",
event_datetime=md.event_datetime,
filename=p.name,
sample_rate=md.sample_rate,
record_time_sec=md.record_time_sec,
)
def _peak_ips(ch: str) -> float:
arr = decoded.get(ch, [])
return geo_count_to_ips(max((abs(v) for v in arr), default=0))
peaks = IdfPeaks(
transverse_ips=_peak_ips("Tran"),
vertical_ips=_peak_ips("Vert"),
longitudinal_ips=_peak_ips("Long"),
# PVS requires aligned per-sample √(T²+V²+L²); leave None — the
# sidecar carries it and the bridge picks it up if present.
peak_vector_sum_ips=None,
mic_pspl_dbl=None,
)
event = IdfEvent(
serial=md.serial or "UNKNOWN",
timestamp=md.event_datetime or datetime.datetime(1970, 1, 1),
kind="Waveform",
filename=p.name,
sample_rate=md.sample_rate,
record_time_sec=md.record_time_sec,
peaks=peaks,
report=rep,
)
return IdfReadResult(
event=event,
samples=decoded,
binary_metadata=md,
signature=signature,
)