3457ed0072
BW writes "OORANGE" (truncation of "Out Of Range") when a channel exceeds its full-scale, and uses a typo'd label "Peak Vector Sum TimeSum" for the PVS time field. Both confirmed against real ASCII files pulled from a Windows watcher PC 2026-05-27: T190LD5Q.LK0W Vert PPV = OORANGE (Normal range, 10 in/s exceeded) T438L713.RY0W All three PPVs OORANGE (Sensitive range, 1.25 in/s) K557L3YM.OE0W Tran+Vert PPV OORANGE + MicL PSPL OORANGE Previously our _parse_number() returned None for OORANGE → DB columns ended up NULL → events vanished from filters / sorts / dashboards despite being legitimate high-amplitude events. New behavior — substitute a conservative bound + set a saturation flag: - Channel PPV → geo_range_ips + ChannelStats.ppv_saturated - Peak Vector Sum → sqrt(3) * geo_range_ips + peak_vector_sum_saturated - MicL PSPL → 140 dB(L) + MicStats.pspl_saturated Flags propagate to the sidecar's bw_report block so the SFM UI can render "> 10 in/s" / "> 140 dBL" rather than treating the substituted value as exact. Same commit also accepts "Peak Vector Sum TimeSum" as an alias for "Peak Vector Sum Time" (BW always writes the typo on OORANGE PVS lines — every example file confirms it). Tests: new test_oorange_marker_treated_as_saturation (synthetic) + test_real_oorange_event_t190_parses (skips if real fixture absent). 177/177 tests pass; 16 pre-existing missing-fixture skips unchanged. Five events on prod (T190, T438, K557, plus 2 others matching the same fault pattern) will pick up correct peaks + saturation flags once watchers re-forward. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
704 lines
33 KiB
Python
704 lines
33 KiB
Python
"""
|
|
minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII
|
|
report (the .TXT file BW writes alongside each saved event binary).
|
|
|
|
The ASCII export is the authoritative source for every "rich" per-event
|
|
field that BW computes from the waveform but never persists in the BW
|
|
binary itself:
|
|
|
|
- Per-channel PPV (Tran / Vert / Long / MicL)
|
|
- Peak Vector Sum + Peak Vector Sum Time
|
|
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
|
|
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
|
|
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
|
|
- MicL Test Amplitude (mV)
|
|
- Battery, calibration date, monitor-log timestamps
|
|
|
|
Persisting these values into the SFM database lets the monthly-summary
|
|
review workflow ("show me events at Location X with PVS > 0.5") work
|
|
without depending on the (still-undecoded) waveform body codec.
|
|
|
|
Format (verified against decode-re/5-8-26 4-event bundle):
|
|
|
|
- One field per line, wrapped in double quotes: `"Field Name : Value"`
|
|
- Field/value separator: literal ` : ` (space-colon-space).
|
|
- Some field names contain an internal `:` already (e.g. `"Project:"`),
|
|
so we split on the FIRST ` : ` only.
|
|
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
|
|
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
|
|
of `start_time<TAB>stop_time<TAB>description`.
|
|
- Final `"PC SW Version : ..."` line ends the metadata block.
|
|
- A blank line separates metadata from the sample table.
|
|
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
|
|
one row per sample (tab-separated, right-padded numeric values).
|
|
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
|
|
|
|
Because some metadata fields have whitespace quirks ("MicL Time of
|
|
Peak" has two spaces; the leading "Project:" value has its own colon),
|
|
we normalise whitespace in the key before lookup.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Output dataclasses
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class ChannelStats:
|
|
"""Per-channel derived stats, populated from an event report."""
|
|
ppv_ips: Optional[float] = None # in/s (geo channels only)
|
|
zc_freq_hz: Optional[float] = None # Hz
|
|
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
|
|
peak_accel_g: Optional[float] = None # g (geo channels only)
|
|
peak_disp_in: Optional[float] = None # in (geo channels only)
|
|
# When BW writes "OORANGE" (Out Of Range — truncated) for a PPV
|
|
# value, the true peak exceeded the channel's full-scale range.
|
|
# We substitute the range max (e.g. 10.000 in/s for Normal range)
|
|
# as a lower bound, and flag here so downstream UI / alerts know
|
|
# to render "> 10 in/s" or "saturated" instead of trusting the
|
|
# value as an exact measurement.
|
|
ppv_saturated: bool = False
|
|
|
|
|
|
@dataclass
|
|
class MicStats:
|
|
"""MicL-specific stats."""
|
|
weighting: Optional[str] = None # e.g. "Linear Weighting"
|
|
pspl_dbl: Optional[float] = None # dB(L)
|
|
zc_freq_hz: Optional[float] = None
|
|
time_of_peak_s: Optional[float] = None
|
|
# Set when BW writes "OORANGE" for PSPL — mic exceeded its
|
|
# measurement range. pspl_dbl gets the conservative upper bound
|
|
# 140 dBL (typical NL-43 max; some units cap at 148). Consumers
|
|
# should render "> 140 dB(L)" or similar when this flag is set.
|
|
pspl_saturated: bool = False
|
|
|
|
|
|
@dataclass
|
|
class SensorCheck:
|
|
"""Per-channel sensor self-check result.
|
|
|
|
Geo channels report a frequency + ratio; MicL reports a frequency +
|
|
amplitude (mV). All channels also have a Pass/Fail string.
|
|
"""
|
|
test_freq_hz: Optional[float] = None
|
|
test_ratio: Optional[float] = None # geo channels only
|
|
test_amplitude_mv: Optional[float] = None # MicL only
|
|
test_results: Optional[str] = None # "Passed" / "Failed"
|
|
|
|
|
|
@dataclass
|
|
class MonitorLogEntry:
|
|
"""One row of the trailing Monitor Log(s) block."""
|
|
start_time: Optional[datetime.datetime] = None
|
|
stop_time: Optional[datetime.datetime] = None
|
|
description: Optional[str] = None
|
|
|
|
|
|
# BW saturation marker — appears in PPV / Peak Vector Sum / similar
|
|
# numeric fields when the underlying measurement exceeded the
|
|
# channel's full-scale range (e.g., a geophone reading > 10 in/s at
|
|
# Normal range, or a mic exceeding its sensitivity ceiling). Treated
|
|
# as "≥ range_max" + a saturated flag rather than discarded.
|
|
# Appears as: ``"Tran PPV : OORANGE in/s"``
|
|
_OORANGE_MARKERS = ("OORANGE", "OUT OF RANGE")
|
|
|
|
|
|
def _is_oorange(value: str) -> bool:
|
|
"""True when a BW numeric field is an Out-Of-Range saturation marker."""
|
|
s = value.strip().upper()
|
|
return any(m in s for m in _OORANGE_MARKERS)
|
|
|
|
|
|
@dataclass
|
|
class BwAsciiReport:
|
|
"""Structured representation of one BW per-event ASCII export."""
|
|
# ── Identity ─────────────────────────────────────────────────────────────
|
|
event_type: Optional[str] = None # e.g. "Full Waveform"
|
|
serial: Optional[str] = None # e.g. "BE11529"
|
|
version: Optional[str] = None # firmware version line
|
|
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
|
|
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
|
|
|
|
# ── Trigger / recording config ──────────────────────────────────────────
|
|
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
|
|
geo_trigger_level_ips: Optional[float] = None
|
|
pretrig_s: Optional[float] = None # negative seconds
|
|
record_time_s: Optional[float] = None
|
|
record_stop_mode: Optional[str] = None
|
|
sample_rate_sps: Optional[int] = None
|
|
battery_volts: Optional[float] = None
|
|
calibration_date: Optional[datetime.date] = None
|
|
calibration_by: Optional[str] = None # e.g. "Instantel"
|
|
units: Optional[str] = None # e.g. "in/s and dB(L)"
|
|
|
|
# ── Operator-supplied metadata ──────────────────────────────────────────
|
|
# Parsed by POSITION from the 4-line "User Notes" block BW writes
|
|
# between the `Units :` and `Geo Range :` lines. Position-based so
|
|
# the values populate correctly even when an operator renames the
|
|
# labels in Blastware's Compliance Setup → Notes tab (the 4 labels
|
|
# are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:").
|
|
# The original labels BW wrote are preserved in `user_note_labels`
|
|
# so terra-view can render them as the operator named them.
|
|
project: Optional[str] = None # position 1 (BW default label "Project:")
|
|
client: Optional[str] = None # position 2 (BW default label "Client:")
|
|
operator: Optional[str] = None # position 3 (BW default label "User Name:")
|
|
sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:")
|
|
|
|
# Maps canonical slot name → the literal label BW wrote in the ASCII
|
|
# export. Empty if the User Notes block wasn't present. Example
|
|
# when the operator renamed slot 4 to "Building:":
|
|
# {"project": "Project:", "client": "Client:",
|
|
# "operator": "User Name:", "sensor_location": "Building:"}
|
|
user_note_labels: Dict[str, str] = field(default_factory=dict)
|
|
|
|
# ── Geo channel scaling ─────────────────────────────────────────────────
|
|
geo_range_ips: Optional[float] = None # 10.000 / 1.250
|
|
|
|
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
|
|
channels: Dict[str, ChannelStats] = field(default_factory=dict)
|
|
mic: MicStats = field(default_factory=MicStats)
|
|
|
|
# ── Vector sum ──────────────────────────────────────────────────────────
|
|
peak_vector_sum_ips: Optional[float] = None
|
|
peak_vector_sum_time_s: Optional[float] = None
|
|
# Saturation flag — set when BW writes "OORANGE" for the PVS. We
|
|
# then substitute sqrt(3) * geo_range_ips as a conservative upper
|
|
# bound (the theoretical maximum PVS when all 3 geo channels are
|
|
# simultaneously at full-scale). Consumers should display this as
|
|
# ">{value} in/s" or similar.
|
|
peak_vector_sum_saturated: bool = False
|
|
# Histograms additionally have an absolute date+time for the PVS
|
|
# (it occurred at a specific interval). Waveform reports show
|
|
# only the relative-time value above.
|
|
peak_vector_sum_when: Optional[datetime.datetime] = None
|
|
|
|
# ── Histogram-specific fields (populated only when Event Type starts
|
|
# with 'Histogram' / 'Full Histogram' / 'Histogram + Continuous') ──
|
|
histogram_start: Optional[datetime.datetime] = None
|
|
histogram_stop: Optional[datetime.datetime] = None
|
|
histogram_n_intervals: Optional[int] = None # e.g. 4, 1436
|
|
histogram_interval_size_str: Optional[str] = None # "1 minute" / "5 minutes" / "15 seconds"
|
|
histogram_interval_size_s: Optional[float] = None # parsed to seconds
|
|
# Per-channel absolute peak time+date (histogram-specific). For
|
|
# waveform events these are None — those reports use the channel's
|
|
# time_of_peak_s (relative to trigger) instead. Keyed by channel
|
|
# name ("Tran", "Vert", "Long", "MicL").
|
|
channel_peak_when: Dict[str, datetime.datetime] = field(default_factory=dict)
|
|
|
|
# ── Sensor self-check (per channel) ─────────────────────────────────────
|
|
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
|
|
|
|
# ── Monitor log + tooling version ───────────────────────────────────────
|
|
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
|
|
pc_sw_version: Optional[str] = None
|
|
|
|
# ── Sample table (optional; only parsed if requested) ───────────────────
|
|
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
|
|
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
|
|
samples: Optional[List[Tuple[float, float, float, float]]] = None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
_KEY_NORMALISE_RE = re.compile(r"\s+")
|
|
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
|
|
|
|
|
|
def _normalise_key(k: str) -> str:
|
|
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
|
|
"MicL Time of Peak" double-space and leading-colon quirks."""
|
|
return _KEY_NORMALISE_RE.sub(" ", k).strip()
|
|
|
|
|
|
def _strip_quotes(line: str) -> str:
|
|
line = line.rstrip("\r\n")
|
|
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
|
|
return line[1:-1]
|
|
return line
|
|
|
|
|
|
def _parse_number(value: str) -> Optional[float]:
|
|
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
|
|
m = _NUMERIC_RE.match(value.strip())
|
|
if not m:
|
|
return None
|
|
try:
|
|
return float(m.group(0))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_int(value: str) -> Optional[int]:
|
|
n = _parse_number(value)
|
|
return None if n is None else int(round(n))
|
|
|
|
|
|
# Months exactly as BW writes them.
|
|
_MONTHS = {
|
|
"January": 1, "February": 2, "March": 3, "April": 4,
|
|
"May": 5, "June": 6, "July": 7, "August": 8,
|
|
"September": 9, "October": 10, "November": 11, "December": 12,
|
|
# Short forms used in monitor-log rows ("Apr 23 /26").
|
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
|
|
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
|
|
}
|
|
|
|
|
|
def _parse_event_date(s: str) -> Optional[datetime.date]:
|
|
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
|
|
s = s.strip()
|
|
parts = s.replace(",", " ").split()
|
|
if len(parts) < 3:
|
|
return None
|
|
month_name, day_str, year_str = parts[0], parts[1], parts[2]
|
|
month = _MONTHS.get(month_name)
|
|
if month is None:
|
|
return None
|
|
try:
|
|
return datetime.date(int(year_str), month, int(day_str))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_iso_date(s: str) -> Optional[datetime.date]:
|
|
"""Parse "2026-05-16" → date. Histograms use ISO format for their
|
|
Start Date / Stop Date / Peak Date fields; waveforms use the
|
|
"May 8, 2026" long form which `_parse_event_date` handles."""
|
|
s = s.strip()
|
|
try:
|
|
return datetime.date.fromisoformat(s)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
_INTERVAL_UNIT_SECONDS = {
|
|
"second": 1, "seconds": 1, "sec": 1, "secs": 1,
|
|
"minute": 60, "minutes": 60, "min": 60, "mins": 60,
|
|
"hour": 3600, "hours": 3600, "hr": 3600, "hrs": 3600,
|
|
}
|
|
|
|
|
|
def _parse_interval_size(s: str) -> Optional[float]:
|
|
"""Parse "1 minute" / "5 minutes" / "15 seconds" / "2 seconds" → seconds.
|
|
|
|
Handles the BW Compliance Setup → Histogram Interval values verbatim
|
|
("2 seconds", "5 seconds", "15 seconds", "1 minute", "5 minutes",
|
|
"15 minutes") plus a few defensive variants.
|
|
"""
|
|
if not s:
|
|
return None
|
|
parts = s.strip().split()
|
|
if len(parts) < 2:
|
|
return None
|
|
try:
|
|
n = float(parts[0])
|
|
except ValueError:
|
|
return None
|
|
unit_per_s = _INTERVAL_UNIT_SECONDS.get(parts[1].lower())
|
|
if unit_per_s is None:
|
|
return None
|
|
return n * unit_per_s
|
|
|
|
|
|
def _parse_event_time(s: str) -> Optional[datetime.time]:
|
|
"""Parse "15:56:35" → time."""
|
|
s = s.strip()
|
|
try:
|
|
h, m, sec = s.split(":")
|
|
return datetime.time(int(h), int(m), int(sec))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
|
|
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
|
|
parts = value.split(" by ", 1)
|
|
date = _parse_event_date(parts[0])
|
|
by = parts[1].strip() if len(parts) > 1 else None
|
|
return date, by
|
|
|
|
|
|
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
|
|
"""Parse a tab-separated monitor log row.
|
|
|
|
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
|
|
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
|
|
Year is encoded as a 2-digit suffix; we expand "/26" → 2026.
|
|
"""
|
|
parts = line.split("\t")
|
|
if len(parts) < 2:
|
|
return None
|
|
start = _parse_monitor_ts(parts[0])
|
|
stop = _parse_monitor_ts(parts[1])
|
|
desc = parts[2].strip() if len(parts) > 2 else None
|
|
if start is None and stop is None and not desc:
|
|
return None
|
|
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
|
|
|
|
|
|
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
|
|
"""Parse "Apr 23 /26 15:46:16" → datetime."""
|
|
s = s.strip()
|
|
parts = s.split()
|
|
if len(parts) < 4:
|
|
return None
|
|
month = _MONTHS.get(parts[0])
|
|
if month is None:
|
|
return None
|
|
try:
|
|
day = int(parts[1])
|
|
# parts[2] looks like "/26" → century-flip to 2026
|
|
yy = int(parts[2].lstrip("/"))
|
|
year = 2000 + yy if yy < 80 else 1900 + yy
|
|
h, m, sec = (int(x) for x in parts[3].split(":"))
|
|
return datetime.datetime(year, month, day, h, m, sec)
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
# ── User-notes positional slot map ──────────────────────────────────────────
|
|
#
|
|
# Blastware's Compliance Setup → Notes tab shows four operator-supplied
|
|
# fields whose LABELS the operator can rename (see screenshot in
|
|
# project archive). Defaults are "Project:" / "Client:" /
|
|
# "User Name:" / "Seis Loc:", but an operator using a different
|
|
# convention can rename them to anything ("Building:", "Site:",
|
|
# "Address:", etc.). The ASCII export reflects whatever the operator
|
|
# typed, so label-based matching is fragile.
|
|
#
|
|
# What IS reliable: BW always writes the 4 user-notes lines in the
|
|
# same order, contiguously between the `Units :` line and the
|
|
# `Geo Range :` line. We parse them by POSITION and preserve the
|
|
# operator's labels in `report.user_note_labels` so terra-view can
|
|
# render them as the operator intended.
|
|
|
|
_USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Top-level parser
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
|
|
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
|
|
|
|
Set ``parse_samples=True`` to also populate ``report.samples`` with
|
|
the trailing sample table. Default False because the table is
|
|
huge and most callers only want metadata for indexing.
|
|
"""
|
|
if isinstance(text, bytes):
|
|
text = text.decode("ascii", errors="replace")
|
|
|
|
report = BwAsciiReport()
|
|
# Pre-create channel stat slots so callers can rely on them existing.
|
|
for ch in ("Tran", "Vert", "Long", "MicL"):
|
|
report.channels.setdefault(ch, ChannelStats())
|
|
report.sensor_check.setdefault(ch, SensorCheck())
|
|
|
|
lines = text.splitlines()
|
|
i = 0
|
|
n = len(lines)
|
|
|
|
in_monitor_log_section = False
|
|
event_time_str: Optional[str] = None
|
|
event_date: Optional[datetime.date] = None
|
|
|
|
# User-notes block detection. We enter the block after parsing
|
|
# the "Units :" line and exit on the "Geo Range :" line. Inside,
|
|
# the first 4 unmatched `<label> : <value>` lines are assigned to
|
|
# the 4 canonical operator-supplied slots by POSITION (project,
|
|
# client, operator, sensor_location) regardless of what the
|
|
# operator named the labels in BW's Compliance Setup → Notes tab.
|
|
in_user_notes_block = False
|
|
user_note_position = 0
|
|
|
|
# Histogram-field staging — BW writes <Channel> Peak Time and
|
|
# <Channel> Peak Date on separate lines (and similarly Histogram
|
|
# Start Time / Date). We stash the partial value when the time
|
|
# line arrives and combine it when the matching date line arrives.
|
|
_hist_start_time: Optional[datetime.time] = None
|
|
_hist_stop_time: Optional[datetime.time] = None
|
|
_pending_peak_time: Dict[str, Optional[datetime.time]] = {}
|
|
_pvs_time_raw: Optional[str] = None # last Peak Vector Sum Time value, raw
|
|
|
|
while i < n:
|
|
raw_line = lines[i]
|
|
i += 1
|
|
# Blank line marks the start of the sample table.
|
|
if raw_line.strip() == "":
|
|
break
|
|
|
|
line = _strip_quotes(raw_line)
|
|
|
|
# Monitor log section: "Monitor Log(s)" header followed by N rows
|
|
# (still inside double-quoted lines), terminated by a non-row line
|
|
# like "PC SW Version : ..." or a blank line.
|
|
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
|
|
in_monitor_log_section = True
|
|
continue
|
|
if in_monitor_log_section:
|
|
# Heuristic: monitor rows contain a tab; the next "Field : Value"
|
|
# line ends the section.
|
|
if "\t" in line:
|
|
entry = _parse_monitor_row(line)
|
|
if entry:
|
|
report.monitor_log.append(entry)
|
|
continue
|
|
# Falls through to the field parser below; clear the flag.
|
|
in_monitor_log_section = False
|
|
|
|
# "Field : Value" — split on FIRST occurrence of " : "
|
|
idx = line.find(" : ")
|
|
if idx < 0:
|
|
continue
|
|
key = _normalise_key(line[:idx])
|
|
value = line[idx + 3 :].strip()
|
|
|
|
# ── Identity / config ────────────────────────────────────────────────
|
|
if key == "Event Type": report.event_type = value
|
|
elif key == "Serial Number": report.serial = value
|
|
elif key == "Version": report.version = value
|
|
elif key == "File Name": report.file_name = value
|
|
elif key == "Event Time": event_time_str = value
|
|
elif key == "Event Date": event_date = _parse_event_date(value)
|
|
|
|
elif key == "Trigger": report.trigger_channel = value
|
|
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
|
|
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
|
|
elif key == "Record Time": report.record_time_s = _parse_number(value)
|
|
elif key == "Record Stop Mode": report.record_stop_mode = value
|
|
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
|
|
elif key == "Battery Level": report.battery_volts = _parse_number(value)
|
|
elif key == "Calibration":
|
|
report.calibration_date, report.calibration_by = _parse_calibration(value)
|
|
elif key == "Units":
|
|
report.units = value
|
|
# Entering the user-notes block. Next ~4 lines until
|
|
# "Geo Range :" are the operator-supplied notes.
|
|
in_user_notes_block = True
|
|
user_note_position = 0
|
|
|
|
elif key == "Geo Range":
|
|
# Exiting the user-notes block.
|
|
in_user_notes_block = False
|
|
report.geo_range_ips = _parse_number(value)
|
|
|
|
# User-notes block: assign by position (operator may have
|
|
# renamed the labels, so we don't trust them). Preserve the
|
|
# original labels in `user_note_labels` for downstream UIs
|
|
# (terra-view) that want to display them as the operator
|
|
# named them.
|
|
elif in_user_notes_block and user_note_position < len(_USER_NOTE_SLOTS):
|
|
slot = _USER_NOTE_SLOTS[user_note_position]
|
|
setattr(report, slot, value)
|
|
report.user_note_labels[slot] = key
|
|
user_note_position += 1
|
|
|
|
# ── Per-channel stats ────────────────────────────────────────────────
|
|
# All match the pattern "{Channel} <stat-name>"
|
|
elif key in (
|
|
"Tran PPV", "Vert PPV", "Long PPV",
|
|
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
|
|
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
|
|
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
|
|
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
|
|
):
|
|
ch_name, stat = key.split(" ", 1)
|
|
cs = report.channels.setdefault(ch_name, ChannelStats())
|
|
if stat == "PPV":
|
|
if _is_oorange(value):
|
|
# Channel saturated — substitute range max as lower
|
|
# bound; flag so downstream UI can render "> 10 in/s".
|
|
cs.ppv_ips = report.geo_range_ips
|
|
cs.ppv_saturated = True
|
|
else:
|
|
cs.ppv_ips = _parse_number(value)
|
|
else:
|
|
num = _parse_number(value)
|
|
if stat == "ZC Freq": cs.zc_freq_hz = num
|
|
elif stat == "Time of Peak": cs.time_of_peak_s = num
|
|
elif stat == "Peak Acceleration": cs.peak_accel_g = num
|
|
elif stat == "Peak Displacement": cs.peak_disp_in = num
|
|
|
|
# ── Histogram-specific fields ────────────────────────────────────────
|
|
# Histograms have Start/Stop time+date pairs + an interval count
|
|
# and size, plus per-channel absolute Peak Time/Date instead of
|
|
# the waveform's relative Time of Peak.
|
|
elif key == "Histogram Start Time":
|
|
_hist_start_time = _parse_event_time(value)
|
|
elif key == "Histogram Start Date":
|
|
_d = _parse_iso_date(value)
|
|
if _d and _hist_start_time:
|
|
report.histogram_start = datetime.datetime.combine(_d, _hist_start_time)
|
|
elif key == "Histogram Stop Time":
|
|
_hist_stop_time = _parse_event_time(value)
|
|
elif key == "Histogram Stop Date":
|
|
_d = _parse_iso_date(value)
|
|
if _d and _hist_stop_time:
|
|
report.histogram_stop = datetime.datetime.combine(_d, _hist_stop_time)
|
|
elif key == "Number of Intervals":
|
|
try:
|
|
report.histogram_n_intervals = int(float(value.strip()))
|
|
except ValueError:
|
|
pass
|
|
elif key == "Interval Size":
|
|
report.histogram_interval_size_str = value.strip()
|
|
report.histogram_interval_size_s = _parse_interval_size(value)
|
|
|
|
# ── Per-channel histogram Peak Date / Peak Time ──
|
|
# Lines like "Tran Peak Time : 22:31:38" + "Tran Peak Date : 2026-05-16"
|
|
elif key in ("Tran Peak Time", "Vert Peak Time", "Long Peak Time", "MicL Time"):
|
|
ch_name = "MicL" if key == "MicL Time" else key.split(" ", 1)[0]
|
|
_pending_peak_time[ch_name] = _parse_event_time(value)
|
|
elif key in ("Tran Peak Date", "Vert Peak Date", "Long Peak Date", "MicL Date"):
|
|
ch_name = "MicL" if key == "MicL Date" else key.split(" ", 1)[0]
|
|
_d = _parse_iso_date(value)
|
|
_t = _pending_peak_time.get(ch_name)
|
|
if _d and _t:
|
|
report.channel_peak_when[ch_name] = datetime.datetime.combine(_d, _t)
|
|
|
|
# ── Vector Sum ───────────────────────────────────────────────────────
|
|
elif key == "Peak Vector Sum":
|
|
if _is_oorange(value):
|
|
# PVS saturated — conservative upper bound is
|
|
# sqrt(3) * geo_range_ips (all 3 channels at full-scale).
|
|
# Real PVS could be lower (channels rarely peak
|
|
# simultaneously) but never higher within the range.
|
|
if report.geo_range_ips is not None:
|
|
import math as _math
|
|
report.peak_vector_sum_ips = _math.sqrt(3) * report.geo_range_ips
|
|
report.peak_vector_sum_saturated = True
|
|
else:
|
|
report.peak_vector_sum_ips = _parse_number(value)
|
|
# BW writes the PVS-time label with a typo: "Peak Vector Sum TimeSum"
|
|
# (looks like Sum got appended twice). Accept both forms. Confirmed
|
|
# against actual BW output on 2026-05-27 — every PVS-time line in
|
|
# the field examples (T190, T438, K557) uses the typo'd label.
|
|
elif key in ("Peak Vector Sum Time", "Peak Vector Sum TimeSum"):
|
|
report.peak_vector_sum_time_s = _parse_number(value)
|
|
_pvs_time_raw = value
|
|
elif key == "Peak Vector Sum Date":
|
|
# Histogram-mode PVS gets paired with a date. We may have
|
|
# captured 'Peak Vector Sum Time' as either a relative
|
|
# seconds float (waveform) or an HH:MM:SS string we
|
|
# interpreted as a number. For histograms, BW writes
|
|
# "Peak Vector Sum Time : 22:33:52" which _parse_number
|
|
# parses as 22.0 (loses information). When Peak Vector Sum
|
|
# Date arrives, re-parse the previous PVS time line as a
|
|
# clock time and combine into an absolute datetime.
|
|
_d = _parse_iso_date(value)
|
|
if _d and _pvs_time_raw is not None:
|
|
_t = _parse_event_time(_pvs_time_raw)
|
|
if _t:
|
|
report.peak_vector_sum_when = datetime.datetime.combine(_d, _t)
|
|
# The earlier seconds parse was bogus for histograms;
|
|
# clear it so consumers don't think it's a real offset.
|
|
report.peak_vector_sum_time_s = None
|
|
|
|
# ── Microphone block ────────────────────────────────────────────────
|
|
elif key == "Microphone":
|
|
report.mic.weighting = value
|
|
elif key == "MicL PSPL":
|
|
if _is_oorange(value):
|
|
# Mic saturated — substitute conservative upper bound 140 dBL.
|
|
report.mic.pspl_dbl = 140.0
|
|
report.mic.pspl_saturated = True
|
|
else:
|
|
report.mic.pspl_dbl = _parse_number(value)
|
|
# Mirror onto the "MicL" entry in channels so callers querying
|
|
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
|
|
# in/s, so we store as-is in the MicStats and mark the channel.
|
|
elif key == "MicL Time of Peak":
|
|
report.mic.time_of_peak_s = _parse_number(value)
|
|
cs = report.channels.setdefault("MicL", ChannelStats())
|
|
cs.time_of_peak_s = report.mic.time_of_peak_s
|
|
elif key == "MicL ZC Freq":
|
|
report.mic.zc_freq_hz = _parse_number(value)
|
|
cs = report.channels.setdefault("MicL", ChannelStats())
|
|
cs.zc_freq_hz = report.mic.zc_freq_hz
|
|
|
|
# ── Sensor self-check ────────────────────────────────────────────────
|
|
elif key in (
|
|
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
|
|
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
|
|
"MicL Test Amplitude",
|
|
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
|
|
):
|
|
ch_name, stat = key.split(" ", 1)
|
|
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
|
|
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
|
|
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
|
|
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
|
|
elif stat == "Test Results": sc.test_results = value
|
|
|
|
# ── Trailer ─────────────────────────────────────────────────────────
|
|
elif key == "PC SW Version":
|
|
report.pc_sw_version = value
|
|
|
|
# Unknown keys are silently dropped — forward-compat for future
|
|
# BW versions that may add fields.
|
|
|
|
# Combine event date + time into a datetime
|
|
if event_date is not None and event_time_str is not None:
|
|
t = _parse_event_time(event_time_str)
|
|
if t is not None:
|
|
report.event_datetime = datetime.datetime.combine(event_date, t)
|
|
|
|
if parse_samples:
|
|
report.samples = _parse_sample_table(lines, i)
|
|
|
|
return report
|
|
|
|
|
|
def _parse_sample_table(
|
|
lines: List[str], start: int,
|
|
) -> List[Tuple[float, float, float, float]]:
|
|
"""Parse the trailing sample table.
|
|
|
|
The table starts with a header row (" Tran <TAB>...") and continues
|
|
until EOF. Each data row is a tab-separated quartet of numeric values.
|
|
"""
|
|
samples: List[Tuple[float, float, float, float]] = []
|
|
seen_header = False
|
|
for line in lines[start:]:
|
|
line = line.rstrip("\r\n")
|
|
if not line.strip():
|
|
continue
|
|
cols = [c.strip() for c in line.split("\t") if c.strip()]
|
|
if not seen_header:
|
|
# Header row contains channel names; numeric rows don't.
|
|
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
|
|
seen_header = True
|
|
continue
|
|
if len(cols) < 4:
|
|
continue
|
|
try:
|
|
samples.append((
|
|
float(cols[0]), float(cols[1]),
|
|
float(cols[2]), float(cols[3]),
|
|
))
|
|
except ValueError:
|
|
continue
|
|
return samples
|
|
|
|
|
|
def parse_report_file(
|
|
path: Union[str, Path], *, parse_samples: bool = False,
|
|
) -> BwAsciiReport:
|
|
"""Convenience: read a .TXT file from disk and parse it."""
|
|
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)
|