Files
seismo-relay/minimateplus/bw_ascii_report.py
T
serversdown 6a7e8c6e86 feat(bw-report): normalise operator-field label variants
Blastware writes the operator-supplied fields with different label
spellings across firmware versions and recording modes — most
notably "Seis. Location" on histogram exports vs "Seis Loc:" on
waveform exports.  Previous parser only matched the latter, so
every histogram event silently lost its sensor_location field.

Replace the four hardcoded `key.rstrip(":") == "X"` branches with
a single `_OPERATOR_LABEL_MAP` dispatch table keyed by normalised
label (lowercase, trailing colon/period stripped, internal
whitespace collapsed).  Adds these variants on day 1:

  project:         "Project:" / "Project"
  client:          "Client:"  / "Client"
  operator:        "User Name:" / "User Name"
  sensor_location: "Seis Loc:" / "Seis. Location" / "Seis Location"
                 / "Sensor Location" / "Seis Loc"

To absorb future BW label drift, add a one-line dict entry — no
new elif branch.

14 new tests cover:
  - Each label variant routes to the correct field (parametrised)
  - Case-insensitive matching ("seis loc" / "SEIS LOC" / "SeIs LoC")
  - Whitespace-collapse ("Seis  Loc" with double-space)
  - End-to-end parse of a real histogram fixture from
    example-events/histogram/ — sensor_location ('Loc #1 - 2652 Hepner...')
    populates correctly even though the file uses "Seis. Location"

Total bw_ascii_report tests: 19 → 33.  Full SFM suite still green
(69 passed, 44 skipped — pre-existing skips for h5py-dep tests).

Pairs with series3-watcher v1.5.4 (which fixes the filename pairing
so histograms actually reach this parser in the first place).
2026-05-10 20:13:44 +00:00

526 lines
23 KiB
Python

"""
minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII
report (the .TXT file BW writes alongside each saved event binary).
The ASCII export is the authoritative source for every "rich" per-event
field that BW computes from the waveform but never persists in the BW
binary itself:
- Per-channel PPV (Tran / Vert / Long / MicL)
- Peak Vector Sum + Peak Vector Sum Time
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
- MicL Test Amplitude (mV)
- Battery, calibration date, monitor-log timestamps
Persisting these values into the SFM database lets the monthly-summary
review workflow ("show me events at Location X with PVS > 0.5") work
without depending on the (still-undecoded) waveform body codec.
Format (verified against decode-re/5-8-26 4-event bundle):
- One field per line, wrapped in double quotes: `"Field Name : Value"`
- Field/value separator: literal ` : ` (space-colon-space).
- Some field names contain an internal `:` already (e.g. `"Project:"`),
so we split on the FIRST ` : ` only.
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
of `start_time<TAB>stop_time<TAB>description`.
- Final `"PC SW Version : ..."` line ends the metadata block.
- A blank line separates metadata from the sample table.
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
one row per sample (tab-separated, right-padded numeric values).
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
Because some metadata fields have whitespace quirks ("MicL Time of
Peak" has two spaces; the leading "Project:" value has its own colon),
we normalise whitespace in the key before lookup.
"""
from __future__ import annotations
import datetime
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# ─────────────────────────────────────────────────────────────────────────────
# Output dataclasses
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ChannelStats:
"""Per-channel derived stats, populated from an event report."""
ppv_ips: Optional[float] = None # in/s (geo channels only)
zc_freq_hz: Optional[float] = None # Hz
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
peak_accel_g: Optional[float] = None # g (geo channels only)
peak_disp_in: Optional[float] = None # in (geo channels only)
@dataclass
class MicStats:
"""MicL-specific stats."""
weighting: Optional[str] = None # e.g. "Linear Weighting"
pspl_dbl: Optional[float] = None # dB(L)
zc_freq_hz: Optional[float] = None
time_of_peak_s: Optional[float] = None
@dataclass
class SensorCheck:
"""Per-channel sensor self-check result.
Geo channels report a frequency + ratio; MicL reports a frequency +
amplitude (mV). All channels also have a Pass/Fail string.
"""
test_freq_hz: Optional[float] = None
test_ratio: Optional[float] = None # geo channels only
test_amplitude_mv: Optional[float] = None # MicL only
test_results: Optional[str] = None # "Passed" / "Failed"
@dataclass
class MonitorLogEntry:
"""One row of the trailing Monitor Log(s) block."""
start_time: Optional[datetime.datetime] = None
stop_time: Optional[datetime.datetime] = None
description: Optional[str] = None
@dataclass
class BwAsciiReport:
"""Structured representation of one BW per-event ASCII export."""
# ── Identity ─────────────────────────────────────────────────────────────
event_type: Optional[str] = None # e.g. "Full Waveform"
serial: Optional[str] = None # e.g. "BE11529"
version: Optional[str] = None # firmware version line
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
# ── Trigger / recording config ──────────────────────────────────────────
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
geo_trigger_level_ips: Optional[float] = None
pretrig_s: Optional[float] = None # negative seconds
record_time_s: Optional[float] = None
record_stop_mode: Optional[str] = None
sample_rate_sps: Optional[int] = None
battery_volts: Optional[float] = None
calibration_date: Optional[datetime.date] = None
calibration_by: Optional[str] = None # e.g. "Instantel"
units: Optional[str] = None # e.g. "in/s and dB(L)"
# ── Operator-supplied metadata ──────────────────────────────────────────
project: Optional[str] = None
client: Optional[str] = None
operator: Optional[str] = None # User Name
sensor_location: Optional[str] = None # Seis Loc
# ── Geo channel scaling ─────────────────────────────────────────────────
geo_range_ips: Optional[float] = None # 10.000 / 1.250
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
channels: Dict[str, ChannelStats] = field(default_factory=dict)
mic: MicStats = field(default_factory=MicStats)
# ── Vector sum ──────────────────────────────────────────────────────────
peak_vector_sum_ips: Optional[float] = None
peak_vector_sum_time_s: Optional[float] = None
# ── Sensor self-check (per channel) ─────────────────────────────────────
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
# ── Monitor log + tooling version ───────────────────────────────────────
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
pc_sw_version: Optional[str] = None
# ── Sample table (optional; only parsed if requested) ───────────────────
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
samples: Optional[List[Tuple[float, float, float, float]]] = None
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
_KEY_NORMALISE_RE = re.compile(r"\s+")
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
def _normalise_key(k: str) -> str:
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
"MicL Time of Peak" double-space and leading-colon quirks."""
return _KEY_NORMALISE_RE.sub(" ", k).strip()
def _strip_quotes(line: str) -> str:
line = line.rstrip("\r\n")
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
return line[1:-1]
return line
def _parse_number(value: str) -> Optional[float]:
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
m = _NUMERIC_RE.match(value.strip())
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
def _parse_int(value: str) -> Optional[int]:
n = _parse_number(value)
return None if n is None else int(round(n))
# Months exactly as BW writes them.
_MONTHS = {
"January": 1, "February": 2, "March": 3, "April": 4,
"May": 5, "June": 6, "July": 7, "August": 8,
"September": 9, "October": 10, "November": 11, "December": 12,
# Short forms used in monitor-log rows ("Apr 23 /26").
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
def _parse_event_date(s: str) -> Optional[datetime.date]:
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
s = s.strip()
parts = s.replace(",", " ").split()
if len(parts) < 3:
return None
month_name, day_str, year_str = parts[0], parts[1], parts[2]
month = _MONTHS.get(month_name)
if month is None:
return None
try:
return datetime.date(int(year_str), month, int(day_str))
except ValueError:
return None
def _parse_event_time(s: str) -> Optional[datetime.time]:
"""Parse "15:56:35" → time."""
s = s.strip()
try:
h, m, sec = s.split(":")
return datetime.time(int(h), int(m), int(sec))
except (ValueError, IndexError):
return None
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
parts = value.split(" by ", 1)
date = _parse_event_date(parts[0])
by = parts[1].strip() if len(parts) > 1 else None
return date, by
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
"""Parse a tab-separated monitor log row.
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
Year is encoded as a 2-digit suffix; we expand "/26" → 2026.
"""
parts = line.split("\t")
if len(parts) < 2:
return None
start = _parse_monitor_ts(parts[0])
stop = _parse_monitor_ts(parts[1])
desc = parts[2].strip() if len(parts) > 2 else None
if start is None and stop is None and not desc:
return None
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
"""Parse "Apr 23 /26 15:46:16" → datetime."""
s = s.strip()
parts = s.split()
if len(parts) < 4:
return None
month = _MONTHS.get(parts[0])
if month is None:
return None
try:
day = int(parts[1])
# parts[2] looks like "/26" → century-flip to 2026
yy = int(parts[2].lstrip("/"))
year = 2000 + yy if yy < 80 else 1900 + yy
h, m, sec = (int(x) for x in parts[3].split(":"))
return datetime.datetime(year, month, day, h, m, sec)
except (ValueError, IndexError):
return None
# ── Operator-field label normalisation ──────────────────────────────────────
#
# BW has used different label spellings across versions and recording
# modes for the same operator-supplied fields:
#
# project: "Project:" / "Project"
# client: "Client:" / "Client"
# operator: "User Name:" / "User Name"
# sensor_location: "Seis Loc:" / "Seis. Location" / "Seis Location"
# / "Sensor Location"
#
# Per user feedback ("the tags themselves dont matter a ton, what
# matters is the field"), we normalise labels at lookup time so the
# value-extraction works regardless of which spelling BW happens to
# emit on a given machine.
#
# To add a new variant: edit `_OPERATOR_LABEL_MAP` — single source of
# truth. Keys are normalised forms (lowercase, trailing colon and
# period stripped, internal whitespace collapsed); values are
# attribute names on `BwAsciiReport`.
_OPERATOR_LABEL_MAP = {
# project
"project": "project",
# client
"client": "client",
# operator
"user name": "operator",
# sensor location — most variants of "Seis*" + "Sensor Location"
"seis loc": "sensor_location",
"seis. loc": "sensor_location",
"seis. location": "sensor_location",
"seis location": "sensor_location",
"sensor location": "sensor_location",
}
def _normalise_label_for_lookup(key: str) -> str:
"""Normalise a label for the operator-field lookup.
Strips a trailing colon and/or period, collapses internal
whitespace runs, and lowercases. So all of:
"Seis Loc:"
"Seis. Location"
"seis location"
"Sensor Location"
map to canonical forms in `_OPERATOR_LABEL_MAP`.
"""
s = key.strip().rstrip(":").rstrip(".").strip()
s = _KEY_NORMALISE_RE.sub(" ", s)
return s.lower()
# ─────────────────────────────────────────────────────────────────────────────
# Top-level parser
# ─────────────────────────────────────────────────────────────────────────────
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
Set ``parse_samples=True`` to also populate ``report.samples`` with
the trailing sample table. Default False because the table is
huge and most callers only want metadata for indexing.
"""
if isinstance(text, bytes):
text = text.decode("ascii", errors="replace")
report = BwAsciiReport()
# Pre-create channel stat slots so callers can rely on them existing.
for ch in ("Tran", "Vert", "Long", "MicL"):
report.channels.setdefault(ch, ChannelStats())
report.sensor_check.setdefault(ch, SensorCheck())
lines = text.splitlines()
i = 0
n = len(lines)
in_monitor_log_section = False
event_time_str: Optional[str] = None
event_date: Optional[datetime.date] = None
while i < n:
raw_line = lines[i]
i += 1
# Blank line marks the start of the sample table.
if raw_line.strip() == "":
break
line = _strip_quotes(raw_line)
# Monitor log section: "Monitor Log(s)" header followed by N rows
# (still inside double-quoted lines), terminated by a non-row line
# like "PC SW Version : ..." or a blank line.
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
in_monitor_log_section = True
continue
if in_monitor_log_section:
# Heuristic: monitor rows contain a tab; the next "Field : Value"
# line ends the section.
if "\t" in line:
entry = _parse_monitor_row(line)
if entry:
report.monitor_log.append(entry)
continue
# Falls through to the field parser below; clear the flag.
in_monitor_log_section = False
# "Field : Value" — split on FIRST occurrence of " : "
idx = line.find(" : ")
if idx < 0:
continue
key = _normalise_key(line[:idx])
value = line[idx + 3 :].strip()
# ── Identity / config ────────────────────────────────────────────────
if key == "Event Type": report.event_type = value
elif key == "Serial Number": report.serial = value
elif key == "Version": report.version = value
elif key == "File Name": report.file_name = value
elif key == "Event Time": event_time_str = value
elif key == "Event Date": event_date = _parse_event_date(value)
elif key == "Trigger": report.trigger_channel = value
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
elif key == "Record Time": report.record_time_s = _parse_number(value)
elif key == "Record Stop Mode": report.record_stop_mode = value
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
elif key == "Battery Level": report.battery_volts = _parse_number(value)
elif key == "Calibration":
report.calibration_date, report.calibration_by = _parse_calibration(value)
elif key == "Units": report.units = value
# Operator-supplied labels (Project / Client / User Name /
# Seis Loc) — BW writes these with assorted spellings across
# firmware versions and recording modes (e.g. "Seis Loc:" on
# waveform exports vs "Seis. Location" on histogram exports).
# The label normaliser absorbs all known variants; see
# `_OPERATOR_LABEL_MAP` above for the dispatch table.
elif (slot := _OPERATOR_LABEL_MAP.get(_normalise_label_for_lookup(key))):
setattr(report, slot, value)
elif key == "Geo Range": report.geo_range_ips = _parse_number(value)
# ── Per-channel stats ────────────────────────────────────────────────
# All match the pattern "{Channel} <stat-name>"
elif key in (
"Tran PPV", "Vert PPV", "Long PPV",
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
):
ch_name, stat = key.split(" ", 1)
cs = report.channels.setdefault(ch_name, ChannelStats())
num = _parse_number(value)
if stat == "PPV": cs.ppv_ips = num
elif stat == "ZC Freq": cs.zc_freq_hz = num
elif stat == "Time of Peak": cs.time_of_peak_s = num
elif stat == "Peak Acceleration": cs.peak_accel_g = num
elif stat == "Peak Displacement": cs.peak_disp_in = num
# ── Vector Sum ───────────────────────────────────────────────────────
elif key == "Peak Vector Sum":
report.peak_vector_sum_ips = _parse_number(value)
elif key == "Peak Vector Sum Time":
report.peak_vector_sum_time_s = _parse_number(value)
# ── Microphone block ────────────────────────────────────────────────
elif key == "Microphone":
report.mic.weighting = value
elif key == "MicL PSPL":
report.mic.pspl_dbl = _parse_number(value)
# Mirror onto the "MicL" entry in channels so callers querying
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
# in/s, so we store as-is in the MicStats and mark the channel.
elif key == "MicL Time of Peak":
report.mic.time_of_peak_s = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.time_of_peak_s = report.mic.time_of_peak_s
elif key == "MicL ZC Freq":
report.mic.zc_freq_hz = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.zc_freq_hz = report.mic.zc_freq_hz
# ── Sensor self-check ────────────────────────────────────────────────
elif key in (
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
"MicL Test Amplitude",
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
):
ch_name, stat = key.split(" ", 1)
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
elif stat == "Test Results": sc.test_results = value
# ── Trailer ─────────────────────────────────────────────────────────
elif key == "PC SW Version":
report.pc_sw_version = value
# Unknown keys are silently dropped — forward-compat for future
# BW versions that may add fields.
# Combine event date + time into a datetime
if event_date is not None and event_time_str is not None:
t = _parse_event_time(event_time_str)
if t is not None:
report.event_datetime = datetime.datetime.combine(event_date, t)
if parse_samples:
report.samples = _parse_sample_table(lines, i)
return report
def _parse_sample_table(
lines: List[str], start: int,
) -> List[Tuple[float, float, float, float]]:
"""Parse the trailing sample table.
The table starts with a header row (" Tran <TAB>...") and continues
until EOF. Each data row is a tab-separated quartet of numeric values.
"""
samples: List[Tuple[float, float, float, float]] = []
seen_header = False
for line in lines[start:]:
line = line.rstrip("\r\n")
if not line.strip():
continue
cols = [c.strip() for c in line.split("\t") if c.strip()]
if not seen_header:
# Header row contains channel names; numeric rows don't.
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
seen_header = True
continue
if len(cols) < 4:
continue
try:
samples.append((
float(cols[0]), float(cols[1]),
float(cols[2]), float(cols[3]),
))
except ValueError:
continue
return samples
def parse_report_file(
path: Union[str, Path], *, parse_samples: bool = False,
) -> BwAsciiReport:
"""Convenience: read a .TXT file from disk and parse it."""
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)