Files
seismo-relay/minimateplus/bw_ascii_report.py
T
serversdown a032fa5451 refactor(bw-report): parse user notes by POSITION, not by label
The four operator-supplied note fields in BW's Compliance Setup →
Notes tab (Project / Client / User Name / Seis Loc) have
USER-EDITABLE LABELS — an operator can rename them in BW's UI to
"Building:", "Site Address:", "Inspector:", or anything else, and
the ASCII export writes those literal labels verbatim.  The
previous label-normalisation map approach (just added in commit
6a7e8c6) was fragile: it could only match label spellings we'd
enumerated in advance.  An operator using "Site:" instead of
"Seis Loc:" would have their sensor location silently dropped.

What IS reliable: BW always writes the 4 user-notes lines
contiguously, in the same order, between the "Units :" line and
the "Geo Range :" line of the export.  So parse them by POSITION:

  position 1 → project
  position 2 → client
  position 3 → operator
  position 4 → sensor_location

The original labels BW wrote are preserved in a new
`BwAsciiReport.user_note_labels` dict (canonical slot → literal
label string) so terra-view can render them as the operator named
them.

Removes the `_OPERATOR_LABEL_MAP` / `_normalise_label_for_lookup`
helpers and the elif-by-normalised-label branch in `parse_report`.
Replaces with a small state machine that flips on the "Units" line
and flips off on the "Geo Range" line.

Tests:
  - Default-label fixtures (waveform + histogram) still populate
    correctly, with operator's labels captured.
  - Synthetic custom-labelled exports ("Building:" / "Site Address:" /
    etc.) populate the right slots by position.
  - Histogram-specific "Seis. Location:" works.
  - Lines outside the Units→Geo Range range are ignored even if
    they look like user notes (defensive against malformed exports).
  - Partial blocks (fewer than 4 lines) leave later slots None.
  - Extra lines beyond 4 are dropped (5th slot doesn't exist).

26 tests in test_bw_ascii_report.py (was 33; net drop reflects
parametrised label tests collapsed into 6 focused position tests).
Full SFM suite: 62 passed, 44 skipped.

Pairs with series3-watcher v1.5.0 which fixes the filename pairing
so the report reaches this parser in the first place.
2026-05-10 22:28:31 +00:00

523 lines
24 KiB
Python

"""
minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII
report (the .TXT file BW writes alongside each saved event binary).
The ASCII export is the authoritative source for every "rich" per-event
field that BW computes from the waveform but never persists in the BW
binary itself:
- Per-channel PPV (Tran / Vert / Long / MicL)
- Peak Vector Sum + Peak Vector Sum Time
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
- MicL Test Amplitude (mV)
- Battery, calibration date, monitor-log timestamps
Persisting these values into the SFM database lets the monthly-summary
review workflow ("show me events at Location X with PVS > 0.5") work
without depending on the (still-undecoded) waveform body codec.
Format (verified against decode-re/5-8-26 4-event bundle):
- One field per line, wrapped in double quotes: `"Field Name : Value"`
- Field/value separator: literal ` : ` (space-colon-space).
- Some field names contain an internal `:` already (e.g. `"Project:"`),
so we split on the FIRST ` : ` only.
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
of `start_time<TAB>stop_time<TAB>description`.
- Final `"PC SW Version : ..."` line ends the metadata block.
- A blank line separates metadata from the sample table.
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
one row per sample (tab-separated, right-padded numeric values).
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
Because some metadata fields have whitespace quirks ("MicL Time of
Peak" has two spaces; the leading "Project:" value has its own colon),
we normalise whitespace in the key before lookup.
"""
from __future__ import annotations
import datetime
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# ─────────────────────────────────────────────────────────────────────────────
# Output dataclasses
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ChannelStats:
"""Per-channel derived stats, populated from an event report."""
ppv_ips: Optional[float] = None # in/s (geo channels only)
zc_freq_hz: Optional[float] = None # Hz
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
peak_accel_g: Optional[float] = None # g (geo channels only)
peak_disp_in: Optional[float] = None # in (geo channels only)
@dataclass
class MicStats:
"""MicL-specific stats."""
weighting: Optional[str] = None # e.g. "Linear Weighting"
pspl_dbl: Optional[float] = None # dB(L)
zc_freq_hz: Optional[float] = None
time_of_peak_s: Optional[float] = None
@dataclass
class SensorCheck:
"""Per-channel sensor self-check result.
Geo channels report a frequency + ratio; MicL reports a frequency +
amplitude (mV). All channels also have a Pass/Fail string.
"""
test_freq_hz: Optional[float] = None
test_ratio: Optional[float] = None # geo channels only
test_amplitude_mv: Optional[float] = None # MicL only
test_results: Optional[str] = None # "Passed" / "Failed"
@dataclass
class MonitorLogEntry:
"""One row of the trailing Monitor Log(s) block."""
start_time: Optional[datetime.datetime] = None
stop_time: Optional[datetime.datetime] = None
description: Optional[str] = None
@dataclass
class BwAsciiReport:
"""Structured representation of one BW per-event ASCII export."""
# ── Identity ─────────────────────────────────────────────────────────────
event_type: Optional[str] = None # e.g. "Full Waveform"
serial: Optional[str] = None # e.g. "BE11529"
version: Optional[str] = None # firmware version line
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
# ── Trigger / recording config ──────────────────────────────────────────
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
geo_trigger_level_ips: Optional[float] = None
pretrig_s: Optional[float] = None # negative seconds
record_time_s: Optional[float] = None
record_stop_mode: Optional[str] = None
sample_rate_sps: Optional[int] = None
battery_volts: Optional[float] = None
calibration_date: Optional[datetime.date] = None
calibration_by: Optional[str] = None # e.g. "Instantel"
units: Optional[str] = None # e.g. "in/s and dB(L)"
# ── Operator-supplied metadata ──────────────────────────────────────────
# Parsed by POSITION from the 4-line "User Notes" block BW writes
# between the `Units :` and `Geo Range :` lines. Position-based so
# the values populate correctly even when an operator renames the
# labels in Blastware's Compliance Setup → Notes tab (the 4 labels
# are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:").
# The original labels BW wrote are preserved in `user_note_labels`
# so terra-view can render them as the operator named them.
project: Optional[str] = None # position 1 (BW default label "Project:")
client: Optional[str] = None # position 2 (BW default label "Client:")
operator: Optional[str] = None # position 3 (BW default label "User Name:")
sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:")
# Maps canonical slot name → the literal label BW wrote in the ASCII
# export. Empty if the User Notes block wasn't present. Example
# when the operator renamed slot 4 to "Building:":
# {"project": "Project:", "client": "Client:",
# "operator": "User Name:", "sensor_location": "Building:"}
user_note_labels: Dict[str, str] = field(default_factory=dict)
# ── Geo channel scaling ─────────────────────────────────────────────────
geo_range_ips: Optional[float] = None # 10.000 / 1.250
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
channels: Dict[str, ChannelStats] = field(default_factory=dict)
mic: MicStats = field(default_factory=MicStats)
# ── Vector sum ──────────────────────────────────────────────────────────
peak_vector_sum_ips: Optional[float] = None
peak_vector_sum_time_s: Optional[float] = None
# ── Sensor self-check (per channel) ─────────────────────────────────────
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
# ── Monitor log + tooling version ───────────────────────────────────────
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
pc_sw_version: Optional[str] = None
# ── Sample table (optional; only parsed if requested) ───────────────────
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
samples: Optional[List[Tuple[float, float, float, float]]] = None
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
_KEY_NORMALISE_RE = re.compile(r"\s+")
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
def _normalise_key(k: str) -> str:
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
"MicL Time of Peak" double-space and leading-colon quirks."""
return _KEY_NORMALISE_RE.sub(" ", k).strip()
def _strip_quotes(line: str) -> str:
line = line.rstrip("\r\n")
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
return line[1:-1]
return line
def _parse_number(value: str) -> Optional[float]:
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
m = _NUMERIC_RE.match(value.strip())
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
def _parse_int(value: str) -> Optional[int]:
n = _parse_number(value)
return None if n is None else int(round(n))
# Months exactly as BW writes them.
_MONTHS = {
"January": 1, "February": 2, "March": 3, "April": 4,
"May": 5, "June": 6, "July": 7, "August": 8,
"September": 9, "October": 10, "November": 11, "December": 12,
# Short forms used in monitor-log rows ("Apr 23 /26").
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
def _parse_event_date(s: str) -> Optional[datetime.date]:
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
s = s.strip()
parts = s.replace(",", " ").split()
if len(parts) < 3:
return None
month_name, day_str, year_str = parts[0], parts[1], parts[2]
month = _MONTHS.get(month_name)
if month is None:
return None
try:
return datetime.date(int(year_str), month, int(day_str))
except ValueError:
return None
def _parse_event_time(s: str) -> Optional[datetime.time]:
"""Parse "15:56:35" → time."""
s = s.strip()
try:
h, m, sec = s.split(":")
return datetime.time(int(h), int(m), int(sec))
except (ValueError, IndexError):
return None
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
parts = value.split(" by ", 1)
date = _parse_event_date(parts[0])
by = parts[1].strip() if len(parts) > 1 else None
return date, by
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
"""Parse a tab-separated monitor log row.
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
Year is encoded as a 2-digit suffix; we expand "/26" → 2026.
"""
parts = line.split("\t")
if len(parts) < 2:
return None
start = _parse_monitor_ts(parts[0])
stop = _parse_monitor_ts(parts[1])
desc = parts[2].strip() if len(parts) > 2 else None
if start is None and stop is None and not desc:
return None
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
"""Parse "Apr 23 /26 15:46:16" → datetime."""
s = s.strip()
parts = s.split()
if len(parts) < 4:
return None
month = _MONTHS.get(parts[0])
if month is None:
return None
try:
day = int(parts[1])
# parts[2] looks like "/26" → century-flip to 2026
yy = int(parts[2].lstrip("/"))
year = 2000 + yy if yy < 80 else 1900 + yy
h, m, sec = (int(x) for x in parts[3].split(":"))
return datetime.datetime(year, month, day, h, m, sec)
except (ValueError, IndexError):
return None
# ── User-notes positional slot map ──────────────────────────────────────────
#
# Blastware's Compliance Setup → Notes tab shows four operator-supplied
# fields whose LABELS the operator can rename (see screenshot in
# project archive). Defaults are "Project:" / "Client:" /
# "User Name:" / "Seis Loc:", but an operator using a different
# convention can rename them to anything ("Building:", "Site:",
# "Address:", etc.). The ASCII export reflects whatever the operator
# typed, so label-based matching is fragile.
#
# What IS reliable: BW always writes the 4 user-notes lines in the
# same order, contiguously between the `Units :` line and the
# `Geo Range :` line. We parse them by POSITION and preserve the
# operator's labels in `report.user_note_labels` so terra-view can
# render them as the operator intended.
_USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location")
# ─────────────────────────────────────────────────────────────────────────────
# Top-level parser
# ─────────────────────────────────────────────────────────────────────────────
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
Set ``parse_samples=True`` to also populate ``report.samples`` with
the trailing sample table. Default False because the table is
huge and most callers only want metadata for indexing.
"""
if isinstance(text, bytes):
text = text.decode("ascii", errors="replace")
report = BwAsciiReport()
# Pre-create channel stat slots so callers can rely on them existing.
for ch in ("Tran", "Vert", "Long", "MicL"):
report.channels.setdefault(ch, ChannelStats())
report.sensor_check.setdefault(ch, SensorCheck())
lines = text.splitlines()
i = 0
n = len(lines)
in_monitor_log_section = False
event_time_str: Optional[str] = None
event_date: Optional[datetime.date] = None
# User-notes block detection. We enter the block after parsing
# the "Units :" line and exit on the "Geo Range :" line. Inside,
# the first 4 unmatched `<label> : <value>` lines are assigned to
# the 4 canonical operator-supplied slots by POSITION (project,
# client, operator, sensor_location) regardless of what the
# operator named the labels in BW's Compliance Setup → Notes tab.
in_user_notes_block = False
user_note_position = 0
while i < n:
raw_line = lines[i]
i += 1
# Blank line marks the start of the sample table.
if raw_line.strip() == "":
break
line = _strip_quotes(raw_line)
# Monitor log section: "Monitor Log(s)" header followed by N rows
# (still inside double-quoted lines), terminated by a non-row line
# like "PC SW Version : ..." or a blank line.
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
in_monitor_log_section = True
continue
if in_monitor_log_section:
# Heuristic: monitor rows contain a tab; the next "Field : Value"
# line ends the section.
if "\t" in line:
entry = _parse_monitor_row(line)
if entry:
report.monitor_log.append(entry)
continue
# Falls through to the field parser below; clear the flag.
in_monitor_log_section = False
# "Field : Value" — split on FIRST occurrence of " : "
idx = line.find(" : ")
if idx < 0:
continue
key = _normalise_key(line[:idx])
value = line[idx + 3 :].strip()
# ── Identity / config ────────────────────────────────────────────────
if key == "Event Type": report.event_type = value
elif key == "Serial Number": report.serial = value
elif key == "Version": report.version = value
elif key == "File Name": report.file_name = value
elif key == "Event Time": event_time_str = value
elif key == "Event Date": event_date = _parse_event_date(value)
elif key == "Trigger": report.trigger_channel = value
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
elif key == "Record Time": report.record_time_s = _parse_number(value)
elif key == "Record Stop Mode": report.record_stop_mode = value
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
elif key == "Battery Level": report.battery_volts = _parse_number(value)
elif key == "Calibration":
report.calibration_date, report.calibration_by = _parse_calibration(value)
elif key == "Units":
report.units = value
# Entering the user-notes block. Next ~4 lines until
# "Geo Range :" are the operator-supplied notes.
in_user_notes_block = True
user_note_position = 0
elif key == "Geo Range":
# Exiting the user-notes block.
in_user_notes_block = False
report.geo_range_ips = _parse_number(value)
# User-notes block: assign by position (operator may have
# renamed the labels, so we don't trust them). Preserve the
# original labels in `user_note_labels` for downstream UIs
# (terra-view) that want to display them as the operator
# named them.
elif in_user_notes_block and user_note_position < len(_USER_NOTE_SLOTS):
slot = _USER_NOTE_SLOTS[user_note_position]
setattr(report, slot, value)
report.user_note_labels[slot] = key
user_note_position += 1
# ── Per-channel stats ────────────────────────────────────────────────
# All match the pattern "{Channel} <stat-name>"
elif key in (
"Tran PPV", "Vert PPV", "Long PPV",
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
):
ch_name, stat = key.split(" ", 1)
cs = report.channels.setdefault(ch_name, ChannelStats())
num = _parse_number(value)
if stat == "PPV": cs.ppv_ips = num
elif stat == "ZC Freq": cs.zc_freq_hz = num
elif stat == "Time of Peak": cs.time_of_peak_s = num
elif stat == "Peak Acceleration": cs.peak_accel_g = num
elif stat == "Peak Displacement": cs.peak_disp_in = num
# ── Vector Sum ───────────────────────────────────────────────────────
elif key == "Peak Vector Sum":
report.peak_vector_sum_ips = _parse_number(value)
elif key == "Peak Vector Sum Time":
report.peak_vector_sum_time_s = _parse_number(value)
# ── Microphone block ────────────────────────────────────────────────
elif key == "Microphone":
report.mic.weighting = value
elif key == "MicL PSPL":
report.mic.pspl_dbl = _parse_number(value)
# Mirror onto the "MicL" entry in channels so callers querying
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
# in/s, so we store as-is in the MicStats and mark the channel.
elif key == "MicL Time of Peak":
report.mic.time_of_peak_s = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.time_of_peak_s = report.mic.time_of_peak_s
elif key == "MicL ZC Freq":
report.mic.zc_freq_hz = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.zc_freq_hz = report.mic.zc_freq_hz
# ── Sensor self-check ────────────────────────────────────────────────
elif key in (
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
"MicL Test Amplitude",
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
):
ch_name, stat = key.split(" ", 1)
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
elif stat == "Test Results": sc.test_results = value
# ── Trailer ─────────────────────────────────────────────────────────
elif key == "PC SW Version":
report.pc_sw_version = value
# Unknown keys are silently dropped — forward-compat for future
# BW versions that may add fields.
# Combine event date + time into a datetime
if event_date is not None and event_time_str is not None:
t = _parse_event_time(event_time_str)
if t is not None:
report.event_datetime = datetime.datetime.combine(event_date, t)
if parse_samples:
report.samples = _parse_sample_table(lines, i)
return report
def _parse_sample_table(
lines: List[str], start: int,
) -> List[Tuple[float, float, float, float]]:
"""Parse the trailing sample table.
The table starts with a header row (" Tran <TAB>...") and continues
until EOF. Each data row is a tab-separated quartet of numeric values.
"""
samples: List[Tuple[float, float, float, float]] = []
seen_header = False
for line in lines[start:]:
line = line.rstrip("\r\n")
if not line.strip():
continue
cols = [c.strip() for c in line.split("\t") if c.strip()]
if not seen_header:
# Header row contains channel names; numeric rows don't.
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
seen_header = True
continue
if len(cols) < 4:
continue
try:
samples.append((
float(cols[0]), float(cols[1]),
float(cols[2]), float(cols[3]),
))
except ValueError:
continue
return samples
def parse_report_file(
path: Union[str, Path], *, parse_samples: bool = False,
) -> BwAsciiReport:
"""Convenience: read a .TXT file from disk and parse it."""
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)