Files
2026-06-01 19:33:44 +00:00

928 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
minimateplus/event_file_io.py — modern event-file (.sfm.json sidecar) IO.
This module is the single home for event-file conversion code that doesn't
fit cleanly inside `blastware_file.py` (which is the BW binary codec):
- sidecar JSON read/write (the modern per-event metadata file)
- read_blastware_file() — reverse of write_blastware_file, used by
the BW-importer flow when SFM is ingesting files produced by
Blastware's own ACH (where the source A5 frames aren't available).
Sidecar schema v1 layout — see docs in the project plan or the schema
declared in `event_to_sidecar_dict()`.
"""
from __future__ import annotations
import datetime
import hashlib
import json
import logging
import os
import struct
from pathlib import Path
from typing import Optional, Union
from .models import Event, PeakValues, ProjectInfo, Timestamp
from . import blastware_file as _bw # avoid circular reference at module load
from .bw_ascii_report import BwAsciiReport
from .waveform_codec import decode_waveform_v2, decoded_to_adc_counts
from .histogram_codec import decode_histogram_body
# Reference pressure for dB(L) → psi conversion (20 µPa expressed in psi).
# Same constant as sfm/sfm_webapp.html so server-side and browser-side
# conversions agree.
_DBL_REF_PSI = 2.9e-9
log = logging.getLogger(__name__)
# Schema version for the sidecar JSON. Bump when fields change shape.
# Older readers must reject anything > SCHEMA_VERSION; newer fields added
# inside `extensions` are forward-compatible without a bump.
SCHEMA_VERSION = 1
SIDECAR_KIND = "sfm.event"
# Default tool_version stamp; callers can override. Hard-coded here
# rather than read via importlib.metadata because the latter reflects the
# *installed* dist-info, which doesn't update when pyproject.toml is
# bumped without a `pip install` re-run — leading to confusing stale
# version stamps in sidecars. Bump this constant and CHANGELOG.md
# together at release time.
TOOL_VERSION = "0.21.1"
try:
# Best-effort: prefer the installed metadata when it's NEWER than the
# baked-in constant (e.g. a downstream packager bumped the wheel
# without editing this file). Otherwise fall back to TOOL_VERSION.
from importlib.metadata import version as _pkg_version
_meta_v = _pkg_version("seismo-relay")
def _vtuple(s):
try:
return tuple(int(p) for p in s.split(".")[:3])
except Exception:
return (0, 0, 0)
_TOOL_VERSION_DEFAULT = (
_meta_v if _vtuple(_meta_v) > _vtuple(TOOL_VERSION) else TOOL_VERSION
)
except Exception:
_TOOL_VERSION_DEFAULT = TOOL_VERSION
# ── Sidecar dict construction ─────────────────────────────────────────────────
def _ts_iso(ts: Optional[Timestamp]) -> Optional[str]:
if ts is None:
return None
try:
return datetime.datetime(
ts.year, ts.month, ts.day,
ts.hour or 0, ts.minute or 0, ts.second or 0,
).isoformat()
except Exception:
return str(ts)
def _peak_values_to_dict(pv: Optional[PeakValues]) -> dict:
if pv is None:
return {
"transverse": None,
"vertical": None,
"longitudinal": None,
"vector_sum": None,
"mic_psi": None,
}
return {
"transverse": pv.tran,
"vertical": pv.vert,
"longitudinal": pv.long,
"vector_sum": pv.peak_vector_sum,
"mic_psi": pv.micl,
}
def _bw_report_to_dict(report: BwAsciiReport) -> dict:
"""Project a parsed BW ASCII report into the sidecar's `bw_report` block.
All fields are rendered as plain JSON-compatible types (no datetime
objects). Channels are uniformly lowercased for stable JSON keys.
"""
def _ch(ch_name: str) -> dict:
cs = report.channels.get(ch_name)
if cs is None:
return {}
out = {
"ppv_ips": cs.ppv_ips,
"zc_freq_hz": cs.zc_freq_hz,
"time_of_peak_s": cs.time_of_peak_s,
"peak_accel_g": cs.peak_accel_g,
"peak_disp_in": cs.peak_disp_in,
}
# Drop all-None entries — keeps the JSON tidy for partial reports.
out = {k: v for k, v in out.items() if v is not None}
# Saturation flag (only present when True) — signals that ppv_ips
# is the channel range max (a lower bound), not an exact reading.
if getattr(cs, "ppv_saturated", False):
out["ppv_saturated"] = True
# ZC Freq above device reporting ceiling (BW ">100 Hz") — value
# in zc_freq_hz is the threshold, not an exact measurement.
if getattr(cs, "zc_freq_above_range", False):
out["zc_freq_above_range"] = True
return out
def _sc(ch_name: str) -> dict:
sc = report.sensor_check.get(ch_name)
if sc is None:
return {}
out = {
"freq_hz": sc.test_freq_hz,
"ratio": sc.test_ratio,
"amplitude_mv": sc.test_amplitude_mv,
"result": sc.test_results,
}
return {k: v for k, v in out.items() if v is not None}
monitor_log = []
for entry in report.monitor_log:
e = {
"start": entry.start_time.isoformat() if entry.start_time else None,
"stop": entry.stop_time.isoformat() if entry.stop_time else None,
"description": entry.description,
}
monitor_log.append({k: v for k, v in e.items() if v is not None})
return {
"available": True,
"event_type": report.event_type,
"version": report.version,
"trigger": {
"channel": report.trigger_channel,
"geo_level_ips": report.geo_trigger_level_ips,
},
"recording": {
"sample_rate_sps": report.sample_rate_sps,
"record_time_s": report.record_time_s,
"pretrig_s": report.pretrig_s,
"stop_mode": report.record_stop_mode,
"geo_range_ips": report.geo_range_ips,
"units": report.units,
},
"device": {
"battery_volts": report.battery_volts,
"calibration_date": report.calibration_date.isoformat() if report.calibration_date else None,
"calibration_by": report.calibration_by,
},
"peaks": {
"tran": _ch("Tran"),
"vert": _ch("Vert"),
"long": _ch("Long"),
"vector_sum": {
"ips": report.peak_vector_sum_ips,
"time_s": report.peak_vector_sum_time_s,
# Histogram events have an absolute date+time for the PVS
# (the interval at which it occurred); waveform events
# only have the time_s offset.
"when": report.peak_vector_sum_when.isoformat() if report.peak_vector_sum_when else None,
# Set when BW reported the PVS as OORANGE — value is the
# conservative upper bound sqrt(3) * geo_range_ips, not
# an exact peak.
"saturated": bool(getattr(report, "peak_vector_sum_saturated", False)),
},
},
"mic": {
"weighting": report.mic.weighting,
"pspl_dbl": report.mic.pspl_dbl,
"pspl_saturated": bool(getattr(report.mic, "pspl_saturated", False)),
"zc_freq_hz": report.mic.zc_freq_hz,
"zc_freq_above_range": bool(getattr(report.mic, "zc_freq_above_range", False)),
"time_of_peak_s": report.mic.time_of_peak_s,
},
"sensor_check": {
"tran": _sc("Tran"),
"vert": _sc("Vert"),
"long": _sc("Long"),
"mic": _sc("MicL"),
},
# Histogram-specific fields (None on waveform-mode events).
# Per-channel absolute peak time/date for histograms — for
# waveforms see channels[ch]["time_of_peak_s"] instead.
"histogram": {
"start": report.histogram_start.isoformat() if report.histogram_start else None,
"stop": report.histogram_stop.isoformat() if report.histogram_stop else None,
"n_intervals": report.histogram_n_intervals,
"interval_size": report.histogram_interval_size_str,
"interval_size_s": report.histogram_interval_size_s,
"channel_peak_when": {ch: dt.isoformat() for ch, dt in report.channel_peak_when.items()},
},
"monitor_log": monitor_log,
"pc_sw_version": report.pc_sw_version,
}
def _dbl_to_psi(pspl_dbl: float) -> float:
"""Convert dB(L) sound pressure level back to psi. Uses the same
20 µPa reference (= 2.9e-9 psi) as the webapp so server-side and
browser-side conversions agree."""
return _DBL_REF_PSI * (10.0 ** (pspl_dbl / 20.0))
def apply_report_to_event(event: Event, report: BwAsciiReport) -> None:
"""Overlay device-authoritative fields from a parsed BW ASCII report
onto an in-memory Event, IN-PLACE.
Why this exists
───────────────
`read_blastware_file()` parses the BW binary and fills `Event.peak_values`
via `_peaks_from_samples()` — which runs the (still-undecoded) BW body
codec assuming raw int16 LE and produces ±32K-shaped noise on every
channel. Result: peak values land in the SeismoDb event row as
~10 in/s on every event regardless of the actual signal.
When a paired BW ASCII report is available, the report carries the
device's own authoritative peak / project / sample-rate / record-time
values. This helper folds those onto the Event before it flows to
`SeismoDb.insert_events()`, so the DB columns reflect the report
rather than the broken-codec output.
Fields overlaid (only when the report supplies a non-None value):
- peak_values.tran / .vert / .long (from report.channels)
- peak_values.peak_vector_sum (from report.peak_vector_sum_ips)
- peak_values.micl (psi) (from report.mic.pspl_dbl → psi)
- project_info.project / .client / .operator / .sensor_location
- sample_rate (from report.sample_rate_sps)
- rectime_seconds (from report.record_time_s)
Fields NOT touched (operator-edit / parser-output preserved):
- timestamp, raw_samples, record_type, total_samples,
pretrig_samples, _waveform_key, _a5_frames, _raw_record
- false_trigger and review state (those live on the sidecar, not on Event)
"""
if event.peak_values is None:
event.peak_values = PeakValues()
pv = event.peak_values
ch = report.channels
if (t := ch.get("Tran")) and t.ppv_ips is not None: pv.tran = t.ppv_ips
if (v := ch.get("Vert")) and v.ppv_ips is not None: pv.vert = v.ppv_ips
if (l := ch.get("Long")) and l.ppv_ips is not None: pv.long = l.ppv_ips
if report.peak_vector_sum_ips is not None:
pv.peak_vector_sum = report.peak_vector_sum_ips
if report.mic.pspl_dbl is not None and report.mic.pspl_dbl > 0:
pv.micl = _dbl_to_psi(report.mic.pspl_dbl)
if event.project_info is None:
event.project_info = ProjectInfo()
pi = event.project_info
if report.project: pi.project = report.project
if report.client: pi.client = report.client
if report.operator: pi.operator = report.operator
if report.sensor_location: pi.sensor_location = report.sensor_location
if report.sample_rate_sps:
event.sample_rate = report.sample_rate_sps
if report.record_time_s is not None:
event.rectime_seconds = report.record_time_s
def apply_bw_report_dict_to_event(event: Event, bw_report: dict) -> None:
"""Mirror of ``apply_report_to_event`` for the projected sidecar
dict shape (as produced by ``_bw_report_to_dict``).
Why this exists
───────────────
The ingest path holds a live ``BwAsciiReport`` parsed straight from
the ``_ASCII.TXT`` and uses ``apply_report_to_event`` to overlay
device-authoritative peaks onto the codec output before insert.
The backfill path doesn't have the original ``.TXT`` (it's not
retained in the waveform store), but it does have the preserved
``bw_report`` block from the sidecar — which contains the same
projected fields. Re-overlaying those during a backfill keeps the
DB peak columns aligned with what BW reports rather than letting
the codec output (which may be incomplete for unhandled formats or
walker edge cases) win by default.
No-ops cleanly when ``bw_report`` is ``None``, empty, or missing
any particular sub-field — only fields with a concrete value get
written. Mirrors ``apply_report_to_event``'s "report wins where
present" semantics.
"""
if not bw_report:
return
if event.peak_values is None:
event.peak_values = PeakValues()
pv = event.peak_values
peaks = bw_report.get("peaks") or {}
tran = (peaks.get("tran") or {}).get("ppv_ips")
vert = (peaks.get("vert") or {}).get("ppv_ips")
long = (peaks.get("long") or {}).get("ppv_ips")
if tran is not None: pv.tran = tran
if vert is not None: pv.vert = vert
if long is not None: pv.long = long
vs_ips = (peaks.get("vector_sum") or {}).get("ips")
if vs_ips is not None:
pv.peak_vector_sum = vs_ips
mic = bw_report.get("mic") or {}
pspl = mic.get("pspl_dbl")
if pspl is not None and pspl > 0:
pv.micl = _dbl_to_psi(pspl)
rec = bw_report.get("recording") or {}
sr = rec.get("sample_rate_sps")
if sr:
event.sample_rate = sr
rt = rec.get("record_time_s")
if rt is not None:
event.rectime_seconds = rt
def _project_info_to_dict(pi: Optional[ProjectInfo]) -> dict:
if pi is None:
return {
"project": None,
"client": None,
"operator": None,
"sensor_location": None,
}
return {
"project": pi.project,
"client": pi.client,
"operator": pi.operator,
"sensor_location": pi.sensor_location,
}
def event_to_sidecar_dict(
event: Event,
*,
serial: str,
blastware_filename: str,
blastware_filesize: int,
blastware_sha256: str,
source_kind: str = "sfm-live",
txt_filename: Optional[str] = None,
a5_pickle_filename: Optional[str] = None,
tool_version: str = _TOOL_VERSION_DEFAULT,
captured_at: Optional[datetime.datetime] = None,
review: Optional[dict] = None,
extensions: Optional[dict] = None,
bw_report: Optional[BwAsciiReport] = None,
) -> dict:
"""
Build a v1 sidecar dict from an Event + the surrounding metadata.
Pure helper — no file I/O. Callers stitch the result into a sidecar
via `write_sidecar()` (or POST it back via the PATCH endpoint).
When *bw_report* is supplied (e.g. by the ACH-forwarded import path
where Blastware writes a per-event ASCII report alongside the binary),
its decoded fields are folded into the sidecar:
- A new top-level ``bw_report`` block carries the rich derived
per-channel stats (Peak Acceleration, Peak Displacement, ZC Freq,
Time of Peak), the Peak Vector Sum + time, the per-channel sensor
self-check results, and monitor-log timestamps.
- ``peak_values`` is overlaid from the report (the report's PPV/PVS
values are computed by the device firmware and are authoritative;
anything ``read_blastware_file()`` derived from samples is
approximate at best until the body codec is decoded).
- ``project_info`` is overlaid from the report when the report
supplies a non-empty value (the report mirrors the device's
compliance config, which is what BW shows in its event report).
- ``event.timestamp`` is overlaid from the report's Event Date +
Event Time (BW's report timestamps are second-resolution and
match the binary's footer; we prefer the report value because
the BW-binary footer timestamp can drift on some firmware).
"""
if source_kind not in {"sfm-live", "sfm-ach", "bw-import", "idf-import"}:
raise ValueError(f"unknown source_kind: {source_kind!r}")
captured_at = captured_at or datetime.datetime.utcnow()
# ── Overlay event fields from the report when present ───────────────────
timestamp_iso = _ts_iso(event.timestamp)
if bw_report and bw_report.event_datetime:
timestamp_iso = bw_report.event_datetime.isoformat()
# Build peak_values, optionally overlaid from the report. The report
# stores Mic peak as PSPL (dB(L)); we convert to psi to match the
# existing peak_values.mic_psi field.
peak_dict = _peak_values_to_dict(event.peak_values)
if bw_report:
ch = bw_report.channels
if (t := ch.get("Tran")) and t.ppv_ips is not None: peak_dict["transverse"] = t.ppv_ips
if (v := ch.get("Vert")) and v.ppv_ips is not None: peak_dict["vertical"] = v.ppv_ips
if (l := ch.get("Long")) and l.ppv_ips is not None: peak_dict["longitudinal"] = l.ppv_ips
if bw_report.peak_vector_sum_ips is not None:
peak_dict["vector_sum"] = bw_report.peak_vector_sum_ips
if bw_report.mic.pspl_dbl is not None and bw_report.mic.pspl_dbl > 0:
peak_dict["mic_psi"] = _dbl_to_psi(bw_report.mic.pspl_dbl)
# Project info: overlay from report (the report mirrors the
# session-start compliance config that BW renders in event reports).
proj_dict = _project_info_to_dict(event.project_info)
if bw_report:
if bw_report.project: proj_dict["project"] = bw_report.project
if bw_report.client: proj_dict["client"] = bw_report.client
if bw_report.operator: proj_dict["operator"] = bw_report.operator
if bw_report.sensor_location: proj_dict["sensor_location"] = bw_report.sensor_location
# Event-block fields: overlay from report where available.
event_block = {
"serial": serial,
"timestamp": timestamp_iso,
"waveform_key": event._waveform_key.hex() if event._waveform_key else None,
"record_type": event.record_type,
"sample_rate": event.sample_rate,
"rectime_seconds": event.rectime_seconds,
"total_samples": event.total_samples,
"pretrig_samples": event.pretrig_samples,
}
if bw_report:
# Report values are authoritative — they're the user-configured
# values BW reads back, not STRT-derived guesses. In particular
# `event.rectime_seconds` from `read_blastware_file()` reads
# STRT[18] which is actually the `0x46` record-type marker (= 70)
# rather than the user's Record Time setting. Always overwrite.
if bw_report.sample_rate_sps:
event_block["sample_rate"] = bw_report.sample_rate_sps
if bw_report.record_time_s is not None:
event_block["rectime_seconds"] = bw_report.record_time_s
# Derive total_samples + pretrig_samples per channel from the
# report's sample_rate × times. These match the row count of
# the report's sample table (verified: event-c reports 1024 sps
# × (1.0 + 0.25) = 1280 rows).
if (sr := bw_report.sample_rate_sps) and bw_report.record_time_s is not None:
pretrig_s = abs(bw_report.pretrig_s) if bw_report.pretrig_s is not None else 0.0
event_block["total_samples"] = int(round(sr * (bw_report.record_time_s + pretrig_s)))
event_block["pretrig_samples"] = int(round(sr * pretrig_s))
out = {
"schema_version": SCHEMA_VERSION,
"kind": SIDECAR_KIND,
"event": event_block,
"peak_values": peak_dict,
"project_info": proj_dict,
"blastware": {
"filename": blastware_filename,
"filesize": blastware_filesize,
"sha256": blastware_sha256,
"available": True,
},
"source": {
"kind": source_kind,
"captured_at": captured_at.isoformat() + "Z" if captured_at.tzinfo is None else captured_at.isoformat(),
"tool_version": tool_version,
"a5_pickle_filename": a5_pickle_filename,
"txt_filename": txt_filename,
},
"review": review or {
"false_trigger": False,
"reviewer": None,
"reviewed_at": None,
"notes": "",
},
"extensions": extensions or {},
}
if bw_report:
out["bw_report"] = _bw_report_to_dict(bw_report)
return out
# ── Sidecar IO ────────────────────────────────────────────────────────────────
def write_sidecar(path: Union[str, Path], data: dict) -> None:
"""
Atomic write of a sidecar dict to <path>.
Validates schema_version is supported before writing so we don't
silently drop a future-format sidecar over the wire.
"""
path = Path(path)
sv = data.get("schema_version")
if not isinstance(sv, int) or sv < 1 or sv > SCHEMA_VERSION:
raise ValueError(
f"write_sidecar: unsupported schema_version={sv!r} "
f"(this build supports 1..{SCHEMA_VERSION})"
)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=False, default=str)
f.write("\n")
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
def read_sidecar(path: Union[str, Path]) -> dict:
"""
Load a sidecar JSON file.
Raises FileNotFoundError if missing, ValueError on bad shape /
unsupported schema_version. Unknown keys at the top level are
preserved in the returned dict (forward-compat).
"""
path = Path(path)
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"sidecar at {path}: top-level is not a JSON object")
sv = data.get("schema_version")
if not isinstance(sv, int) or sv < 1:
raise ValueError(f"sidecar at {path}: missing or invalid schema_version")
if sv > SCHEMA_VERSION:
raise ValueError(
f"sidecar at {path}: schema_version={sv} > supported {SCHEMA_VERSION}; "
"upgrade seismo-relay to read this file"
)
if data.get("kind") != SIDECAR_KIND:
raise ValueError(f"sidecar at {path}: unexpected kind={data.get('kind')!r}")
return data
def patch_sidecar(
path: Union[str, Path],
*,
review: Optional[dict] = None,
extensions: Optional[dict] = None,
reviewer_now: bool = True,
) -> dict:
"""
Atomically apply a JSON-merge-patch to a sidecar file's `review`
and/or `extensions` blocks. Other top-level keys are untouched.
`review_now`: when True (default) and `review` is non-empty, stamps
`review.reviewed_at` with the current UTC time so the review-time is
auditable without the caller having to pass it.
Returns the new full sidecar dict.
"""
path = Path(path)
data = read_sidecar(path)
if review:
merged = dict(data.get("review") or {})
merged.update({k: v for k, v in review.items() if v is not None or k in merged})
if reviewer_now:
merged["reviewed_at"] = datetime.datetime.utcnow().isoformat() + "Z"
data["review"] = merged
if extensions:
merged_ext = dict(data.get("extensions") or {})
merged_ext.update(extensions)
data["extensions"] = merged_ext
write_sidecar(path, data)
return data
def sidecar_path_for(blastware_path: Union[str, Path]) -> Path:
"""Convention: <bw_path>.sfm.json sits next to the BW binary."""
p = Path(blastware_path)
return p.with_name(p.name + ".sfm.json")
def file_sha256(path: Union[str, Path], chunk_size: int = 65536) -> str:
"""Compute SHA-256 of a file as a hex string."""
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
# ── Blastware-file reader ─────────────────────────────────────────────────────
#
# Reverse of `blastware_file.write_blastware_file`. Used by the BW-import
# flow to ingest files produced by Blastware's own ACH (where the source
# A5 frames are not available).
#
# File structure (recap):
# [22B header] [21B STRT record] [body bytes] [26B footer]
#
# The body holds:
# - 6B preamble (00 00 ff ff ff ff) immediately after the STRT
# - 4-channel interleaved int16 LE samples
# - Embedded ASCII metadata strings (Project: / Client: / User Name: /
# Seis Loc: / Extended Notes) from the device's session-start config
#
# The 0C waveform record (per-event peaks, project name) is NOT in the
# BW file — those are computed by the device firmware and only carried
# in the live SUB 0C response. read_blastware_file() therefore computes
# peaks from the raw samples assuming Normal-range (10 in/s full-scale)
# geophone sensitivity. Imported events surface that assumption via the
# sidecar's `peak_values.computed_from_samples` flag.
# Geophone scale factor, in/s per ADC unit, for Normal range (10 in/s FS).
# Confirmed from CLAUDE.md (geo_hardware_constant = 6.206053 in/s per V,
# ADC full-scale = 1.61133 V Normal range = 10.0 in/s peak; per-count
# resolution ≈ 10.0 / 32768).
_GEO_NORMAL_FS_INS = 10.0
_GEO_SENSITIVE_FS_INS = 1.250
_INT16_FS = 32768.0
# Microphone scale factor, psi per ADC count. Approximate — exact factor
# depends on the geophone-vs-mic ADC scaling and the firmware reference.
# We mark mic_psi as "computed approximate" in the sidecar.
_MIC_FS_PSI = 0.0125 / _INT16_FS # ~0.5 psi full-scale assumption
def _decode_strt(strt: bytes) -> dict:
"""
Decode the 21-byte STRT record from a BW file.
Returns dict with waveform_key (4B), total_samples, pretrig_samples,
rectime_seconds. Falls back to None on truncated/missing fields.
"""
if len(strt) < 21 or strt[0:4] != b"STRT":
return {}
return {
"waveform_key": strt[6:10].hex(),
"total_samples": struct.unpack_from(">H", strt, 8)[0],
"pretrig_samples": struct.unpack_from(">H", strt, 16)[0],
"rectime_seconds": strt[18],
}
def _find_first_string(buf: bytes, label: bytes, max_len: int = 256) -> Optional[str]:
"""
Search `buf` for `label` (e.g. b"Project:") and return the
null-terminated ASCII string that follows, stripped.
"""
pos = buf.find(label)
if pos < 0:
return None
start = pos + len(label)
end = buf.find(b"\x00", start, start + max_len)
if end < 0:
end = start + max_len
text = buf[start:end].decode("ascii", errors="replace").strip()
return text or None
def _decode_samples_4ch_int16_le(stream: bytes) -> dict[str, list[int]]:
"""
Decode a 4-channel interleaved int16 LE byte stream into per-channel
lists. Channels are [Tran, Vert, Long, Mic] = [ch0, ch1, ch2, ch3].
Truncates to a multiple of 8 bytes (one full sample-set).
"""
n_complete = (len(stream) // 8) * 8
if n_complete == 0:
return {"Tran": [], "Vert": [], "Long": [], "MicL": []}
fmt = "<" + "h" * (n_complete // 2)
flat = list(struct.unpack(fmt, stream[:n_complete]))
return {
"Tran": flat[0::4],
"Vert": flat[1::4],
"Long": flat[2::4],
"MicL": flat[3::4],
}
def _peaks_from_samples(samples: dict[str, list[int]]) -> PeakValues:
"""
Compute approximate peaks from raw int16 samples assuming Normal-range
geophone sensitivity. Used by the BW-importer when the 0C waveform
record (the device's authoritative peaks) is unavailable.
"""
def _peak_ins(ch: list[int]) -> float:
if not ch:
return 0.0
m = max(abs(int(v)) for v in ch)
return m / _INT16_FS * _GEO_NORMAL_FS_INS
tran = _peak_ins(samples.get("Tran", []))
vert = _peak_ins(samples.get("Vert", []))
long_ = _peak_ins(samples.get("Long", []))
# Mic in psi (approximate)
mic_ch = samples.get("MicL", []) or []
mic = max((abs(int(v)) for v in mic_ch), default=0) * _MIC_FS_PSI
# Peak vector sum: max over time of sqrt(T^2 + V^2 + L^2)
pvs = 0.0
n = min(len(samples.get("Tran", [])), len(samples.get("Vert", [])), len(samples.get("Long", [])))
if n:
scale = _GEO_NORMAL_FS_INS / _INT16_FS
T = samples["Tran"]; V = samples["Vert"]; L = samples["Long"]
for i in range(n):
t = T[i] * scale
v = V[i] * scale
l = L[i] * scale
mag = (t*t + v*v + l*l) ** 0.5
if mag > pvs:
pvs = mag
return PeakValues(
tran=tran, vert=vert, long=long_,
peak_vector_sum=pvs, micl=mic,
)
_RECORD_TYPE_BY_EXT_SUFFIX = {
'H': 'Histogram',
'W': 'Waveform',
'M': 'Manual',
'E': 'Event',
'C': 'Combo',
}
def derive_record_type_from_filename(filename, default: str = "Waveform") -> str:
"""Derive a BW Event's record_type from its filename's extension suffix.
V10.72+ MiniMate Plus firmware encodes the event type as the LAST
character of the extension (the `T` in BW's `AB0T` scheme):
``M529LKIQ.G10H`` → H → ``"Histogram"``
``T350L385.VY0W`` → W → ``"Waveform"``
``...M`` → M → ``"Manual"``
``...E`` → E → ``"Event"``
``...C`` → C → ``"Combo"``
Old S338 firmware uses 3-char extensions ending in ``0`` whose
encoding is not yet known — those fall through to ``default``.
Micromate Series 4 uses a different scheme entirely (observed:
``IDFH``, ``IDFW``) but the LAST-char convention (H / W) still holds
for the type code, so it works for both families.
Returns ``default`` if filename is empty, has no extension, or the
suffix char isn't a recognized type code.
"""
if not filename:
return default
try:
name = Path(filename).name
except (TypeError, ValueError):
return default
if '.' not in name:
return default
ext = name.rsplit('.', 1)[1]
if not ext:
return default
return _RECORD_TYPE_BY_EXT_SUFFIX.get(ext[-1].upper(), default)
def read_blastware_file(path: Union[str, Path]) -> Event:
"""
Parse a Blastware waveform file into an Event.
Recovers:
- waveform_key, rectime_seconds, total_samples, pretrig_samples
(from the STRT record)
- timestamp (from the footer's start-time field)
- project_info (from ASCII labels embedded in the body)
- raw_samples (Tran/Vert/Long/MicL int16 lists)
- peak_values (computed from raw_samples; approximate — see notes
on _peaks_from_samples about Normal-range assumption)
Does NOT recover the source A5 frames (they aren't in the BW file).
The returned Event has `_a5_frames = None`, signalling that
byte-for-byte regeneration of the BW file from this Event alone is
not possible — the on-disk BW file IS the byte-for-byte source.
"""
path = Path(path)
raw = path.read_bytes()
if len(raw) < _bw._WAVEFORM_HEADER_SIZE + 21 + 26:
raise ValueError(f"{path}: file too short ({len(raw)} bytes) to be a BW event")
# Header: validate magic prefix.
header = raw[:_bw._WAVEFORM_HEADER_SIZE]
if not header.startswith(_bw._FILE_HEADER_PREFIX):
raise ValueError(f"{path}: not a Blastware file (bad header prefix)")
# STRT record: 21 bytes immediately after the header.
strt_raw = raw[_bw._WAVEFORM_HEADER_SIZE : _bw._WAVEFORM_HEADER_SIZE + 21]
strt_fields = _decode_strt(strt_raw)
if not strt_fields:
raise ValueError(f"{path}: STRT record missing or malformed")
# Footer: locate the 0e 08 marker, validating the year is in a sane range.
body_start = _bw._WAVEFORM_HEADER_SIZE + 21
footer_pos = -1
pos = body_start
while True:
pos = raw.find(b"\x0e\x08", pos)
if pos < 0 or pos + 26 > len(raw):
break
yr = (raw[pos + 4] << 8) | raw[pos + 5]
if 2015 <= yr <= 2050:
footer_pos = pos
break
pos += 1
if footer_pos < 0 and len(raw) >= 26:
footer_pos = len(raw) - 26
if footer_pos < body_start:
raise ValueError(f"{path}: footer not found")
body = raw[body_start : footer_pos]
footer = raw[footer_pos : footer_pos + 26]
# Footer layout:
# [0:2] 0e 08 marker
# [2:10] ts1 (start) BE 8B
# [10:18] ts2 (stop) BE 8B
# [18:24] 00 01 00 02 00 00
# [24:26] crc
ts1 = _bw._decode_ts_be(footer[2:10])
ts2 = _bw._decode_ts_be(footer[10:18])
# Body: decode via the verified body codecs. Two formats coexist:
#
# 1. Waveform-mode (.AB0W) — starts with 7-byte preamble
# ``00 02 00 [Tran[0] BE] [Tran[1] BE]`` followed by the
# tagged-block delta stream documented in
# ``docs/waveform_codec_re_status.md`` and §7.6.1 of the
# protocol reference. Decoded by ``waveform_codec.decode_waveform_v2``.
#
# 2. Histogram-mode (.AB0H) — a sequence of 32-byte blocks, one
# per histogram interval, each carrying per-channel peak +
# half-period values. Decoded by
# ``histogram_codec.decode_histogram_body``. Both codecs
# return the same channel-grouped output shape, so consumers
# don't need to special-case mode.
#
# The historical ``_decode_samples_4ch_int16_le`` int16-LE
# interpretation was retracted 2026-05-08 (see protocol-ref §7.6.1
# retraction box) — it produced ±32K noise on every event.
#
# If both codecs fail (malformed file, truncated body, unrecognised
# mode, synthetic test input), fall back to empty channels — the
# rest of the event (timestamp, waveform_key, project strings) is
# still recoverable and useful.
decoded = decode_waveform_v2(body)
if decoded is None:
decoded = decode_histogram_body(body)
if decoded is None:
log.warning(
"%s: body codec failed to decode (body starts %s) — "
"raw_samples will be empty", path, body[:8].hex(" "),
)
samples = {"Tran": [], "Vert": [], "Long": [], "MicL": []}
else:
samples = decoded_to_adc_counts(decoded)
# Metadata strings (label-anchored search across the body).
project = _find_first_string(body, b"Project:")
client = _find_first_string(body, b"Client:")
user = _find_first_string(body, b"User Name:")
seisloc = _find_first_string(body, b"Seis Loc:")
# Build the Event.
ev = Event(index=-1)
if strt_fields.get("waveform_key"):
ev._waveform_key = bytes.fromhex(strt_fields["waveform_key"])
# Derive record_type from the filename's extension suffix (H/W/M/E/C).
# When called from save_imported_bw the path here is a tmp file with a
# ".bw" suffix, so the derivation falls back to "Waveform" and the
# caller overrides ev.record_type using the original filename — see
# waveform_store.save_imported_bw.
ev.record_type = derive_record_type_from_filename(path.name)
ev.rectime_seconds = strt_fields.get("rectime_seconds")
ev.total_samples = strt_fields.get("total_samples")
ev.pretrig_samples = strt_fields.get("pretrig_samples")
if ts1 is not None:
ev.timestamp = Timestamp(
raw=footer[2:10],
flag=0x10,
year=ts1.year, unknown_byte=0, month=ts1.month, day=ts1.day,
hour=ts1.hour, minute=ts1.minute, second=ts1.second,
)
ev.project_info = ProjectInfo(
project=project, client=client, operator=user, sensor_location=seisloc,
)
ev.raw_samples = samples
# Only compute peaks from samples when we actually have samples.
# For events the codec couldn't decode (histogram-mode bodies, until
# the §7.6.2 histogram codec is wired in), samples is an empty dict
# and ``_peaks_from_samples`` would return PeakValues(0, 0, 0, 0, 0).
# That would then OVERWRITE existing good DB peak values (e.g. from
# paired BW ASCII reports) during the backfill UPSERT path.
# Leaving peak_values=None signals "we don't know" to downstream
# consumers; the backfill script seeds from the DB row when it sees
# None, and ``apply_report_to_event`` overlays from a paired ASCII
# report when one is supplied.
has_samples = any(samples.get(ch) for ch in ("Tran", "Vert", "Long", "MicL"))
ev.peak_values = _peaks_from_samples(samples) if has_samples else None
ev._a5_frames = None # not recoverable from BW file
return ev