780b45a371
BW writes ">100 Hz" for ZC Freq when the zero-crossing algorithm sees a peak too fast to count — the device's reporting ceiling is 100 Hz on V10.72. Our parser fell back to None via _parse_number (which requires a leading digit), so the PDF rendered "—" where BW shows ">100". Mirrors the OORANGE/saturated pattern already used for PPV and PSPL: parser stores the threshold (100.0) on zc_freq_hz + sets a new zc_freq_above_range flag. Projection carries the flag through to the sidecar; PDF renderer prepends ">" when set. Affects both per-channel stats tables (waveform + histogram variants) and the mic block's ZC Freq row. Verified on the real T190LD5Q.LK0W fixture: Tran zc_freq_hz=100.0 above_range=True; Vert/Long (normal values) above_range=False; "N/A" still produces zc_freq_hz=None which renders as "—" (unchanged). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
739 lines
35 KiB
Python
739 lines
35 KiB
Python
"""
|
|
minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII
|
|
report (the .TXT file BW writes alongside each saved event binary).
|
|
|
|
The ASCII export is the authoritative source for every "rich" per-event
|
|
field that BW computes from the waveform but never persists in the BW
|
|
binary itself:
|
|
|
|
- Per-channel PPV (Tran / Vert / Long / MicL)
|
|
- Peak Vector Sum + Peak Vector Sum Time
|
|
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
|
|
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
|
|
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
|
|
- MicL Test Amplitude (mV)
|
|
- Battery, calibration date, monitor-log timestamps
|
|
|
|
Persisting these values into the SFM database lets the monthly-summary
|
|
review workflow ("show me events at Location X with PVS > 0.5") work
|
|
without depending on the (still-undecoded) waveform body codec.
|
|
|
|
Format (verified against decode-re/5-8-26 4-event bundle):
|
|
|
|
- One field per line, wrapped in double quotes: `"Field Name : Value"`
|
|
- Field/value separator: literal ` : ` (space-colon-space).
|
|
- Some field names contain an internal `:` already (e.g. `"Project:"`),
|
|
so we split on the FIRST ` : ` only.
|
|
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
|
|
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
|
|
of `start_time<TAB>stop_time<TAB>description`.
|
|
- Final `"PC SW Version : ..."` line ends the metadata block.
|
|
- A blank line separates metadata from the sample table.
|
|
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
|
|
one row per sample (tab-separated, right-padded numeric values).
|
|
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
|
|
|
|
Because some metadata fields have whitespace quirks ("MicL Time of
|
|
Peak" has two spaces; the leading "Project:" value has its own colon),
|
|
we normalise whitespace in the key before lookup.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Output dataclasses
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class ChannelStats:
|
|
"""Per-channel derived stats, populated from an event report."""
|
|
ppv_ips: Optional[float] = None # in/s (geo channels only)
|
|
zc_freq_hz: Optional[float] = None # Hz
|
|
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
|
|
peak_accel_g: Optional[float] = None # g (geo channels only)
|
|
peak_disp_in: Optional[float] = None # in (geo channels only)
|
|
# When BW writes "OORANGE" (Out Of Range — truncated) for a PPV
|
|
# value, the true peak exceeded the channel's full-scale range.
|
|
# We substitute the range max (e.g. 10.000 in/s for Normal range)
|
|
# as a lower bound, and flag here so downstream UI / alerts know
|
|
# to render "> 10 in/s" or "saturated" instead of trusting the
|
|
# value as an exact measurement.
|
|
ppv_saturated: bool = False
|
|
# Set when BW writes ">100 Hz" for ZC Freq — the zero-crossing
|
|
# algorithm's peak frequency exceeded the device's reporting
|
|
# ceiling (typically 100 Hz on V10.72). zc_freq_hz gets the
|
|
# threshold (100.0) as a lower bound; downstream UI renders ">100".
|
|
zc_freq_above_range: bool = False
|
|
|
|
|
|
@dataclass
|
|
class MicStats:
|
|
"""MicL-specific stats."""
|
|
weighting: Optional[str] = None # e.g. "Linear Weighting"
|
|
pspl_dbl: Optional[float] = None # dB(L)
|
|
zc_freq_hz: Optional[float] = None
|
|
time_of_peak_s: Optional[float] = None
|
|
# Set when BW writes "OORANGE" for PSPL — mic exceeded its
|
|
# measurement range. pspl_dbl gets the conservative upper bound
|
|
# 140 dBL (typical NL-43 max; some units cap at 148). Consumers
|
|
# should render "> 140 dB(L)" or similar when this flag is set.
|
|
pspl_saturated: bool = False
|
|
# Same semantics as ChannelStats.zc_freq_above_range — mic ZC
|
|
# peak exceeded device reporting ceiling.
|
|
zc_freq_above_range: bool = False
|
|
|
|
|
|
@dataclass
|
|
class SensorCheck:
|
|
"""Per-channel sensor self-check result.
|
|
|
|
Geo channels report a frequency + ratio; MicL reports a frequency +
|
|
amplitude (mV). All channels also have a Pass/Fail string.
|
|
"""
|
|
test_freq_hz: Optional[float] = None
|
|
test_ratio: Optional[float] = None # geo channels only
|
|
test_amplitude_mv: Optional[float] = None # MicL only
|
|
test_results: Optional[str] = None # "Passed" / "Failed"
|
|
|
|
|
|
@dataclass
|
|
class MonitorLogEntry:
|
|
"""One row of the trailing Monitor Log(s) block."""
|
|
start_time: Optional[datetime.datetime] = None
|
|
stop_time: Optional[datetime.datetime] = None
|
|
description: Optional[str] = None
|
|
|
|
|
|
# BW saturation marker — appears in PPV / Peak Vector Sum / similar
|
|
# numeric fields when the underlying measurement exceeded the
|
|
# channel's full-scale range (e.g., a geophone reading > 10 in/s at
|
|
# Normal range, or a mic exceeding its sensitivity ceiling). Treated
|
|
# as "≥ range_max" + a saturated flag rather than discarded.
|
|
# Appears as: ``"Tran PPV : OORANGE in/s"``
|
|
_OORANGE_MARKERS = ("OORANGE", "OUT OF RANGE")
|
|
|
|
|
|
def _is_oorange(value: str) -> bool:
|
|
"""True when a BW numeric field is an Out-Of-Range saturation marker."""
|
|
s = value.strip().upper()
|
|
return any(m in s for m in _OORANGE_MARKERS)
|
|
|
|
|
|
def _parse_above_range(value: str) -> Optional[float]:
|
|
"""For BW "above-range" markers like ">100 Hz", return the threshold.
|
|
|
|
BW writes ZC Freq as ">100 Hz" when the zero-crossing algorithm sees
|
|
a peak too fast to count (device cuts off at 100 Hz). Returns the
|
|
numeric portion after the '>' (e.g. 100.0), or None if `value` is
|
|
not an above-range marker.
|
|
"""
|
|
s = value.strip()
|
|
if not s.startswith(">"):
|
|
return None
|
|
return _parse_number(s[1:])
|
|
|
|
|
|
@dataclass
|
|
class BwAsciiReport:
|
|
"""Structured representation of one BW per-event ASCII export."""
|
|
# ── Identity ─────────────────────────────────────────────────────────────
|
|
event_type: Optional[str] = None # e.g. "Full Waveform"
|
|
serial: Optional[str] = None # e.g. "BE11529"
|
|
version: Optional[str] = None # firmware version line
|
|
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
|
|
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
|
|
|
|
# ── Trigger / recording config ──────────────────────────────────────────
|
|
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
|
|
geo_trigger_level_ips: Optional[float] = None
|
|
pretrig_s: Optional[float] = None # negative seconds
|
|
record_time_s: Optional[float] = None
|
|
record_stop_mode: Optional[str] = None
|
|
sample_rate_sps: Optional[int] = None
|
|
battery_volts: Optional[float] = None
|
|
calibration_date: Optional[datetime.date] = None
|
|
calibration_by: Optional[str] = None # e.g. "Instantel"
|
|
units: Optional[str] = None # e.g. "in/s and dB(L)"
|
|
|
|
# ── Operator-supplied metadata ──────────────────────────────────────────
|
|
# Parsed by POSITION from the 4-line "User Notes" block BW writes
|
|
# between the `Units :` and `Geo Range :` lines. Position-based so
|
|
# the values populate correctly even when an operator renames the
|
|
# labels in Blastware's Compliance Setup → Notes tab (the 4 labels
|
|
# are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:").
|
|
# The original labels BW wrote are preserved in `user_note_labels`
|
|
# so terra-view can render them as the operator named them.
|
|
project: Optional[str] = None # position 1 (BW default label "Project:")
|
|
client: Optional[str] = None # position 2 (BW default label "Client:")
|
|
operator: Optional[str] = None # position 3 (BW default label "User Name:")
|
|
sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:")
|
|
|
|
# Maps canonical slot name → the literal label BW wrote in the ASCII
|
|
# export. Empty if the User Notes block wasn't present. Example
|
|
# when the operator renamed slot 4 to "Building:":
|
|
# {"project": "Project:", "client": "Client:",
|
|
# "operator": "User Name:", "sensor_location": "Building:"}
|
|
user_note_labels: Dict[str, str] = field(default_factory=dict)
|
|
|
|
# ── Geo channel scaling ─────────────────────────────────────────────────
|
|
geo_range_ips: Optional[float] = None # 10.000 / 1.250
|
|
|
|
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
|
|
channels: Dict[str, ChannelStats] = field(default_factory=dict)
|
|
mic: MicStats = field(default_factory=MicStats)
|
|
|
|
# ── Vector sum ──────────────────────────────────────────────────────────
|
|
peak_vector_sum_ips: Optional[float] = None
|
|
peak_vector_sum_time_s: Optional[float] = None
|
|
# Saturation flag — set when BW writes "OORANGE" for the PVS. We
|
|
# then substitute sqrt(3) * geo_range_ips as a conservative upper
|
|
# bound (the theoretical maximum PVS when all 3 geo channels are
|
|
# simultaneously at full-scale). Consumers should display this as
|
|
# ">{value} in/s" or similar.
|
|
peak_vector_sum_saturated: bool = False
|
|
# Histograms additionally have an absolute date+time for the PVS
|
|
# (it occurred at a specific interval). Waveform reports show
|
|
# only the relative-time value above.
|
|
peak_vector_sum_when: Optional[datetime.datetime] = None
|
|
|
|
# ── Histogram-specific fields (populated only when Event Type starts
|
|
# with 'Histogram' / 'Full Histogram' / 'Histogram + Continuous') ──
|
|
histogram_start: Optional[datetime.datetime] = None
|
|
histogram_stop: Optional[datetime.datetime] = None
|
|
histogram_n_intervals: Optional[int] = None # e.g. 4, 1436
|
|
histogram_interval_size_str: Optional[str] = None # "1 minute" / "5 minutes" / "15 seconds"
|
|
histogram_interval_size_s: Optional[float] = None # parsed to seconds
|
|
# Per-channel absolute peak time+date (histogram-specific). For
|
|
# waveform events these are None — those reports use the channel's
|
|
# time_of_peak_s (relative to trigger) instead. Keyed by channel
|
|
# name ("Tran", "Vert", "Long", "MicL").
|
|
channel_peak_when: Dict[str, datetime.datetime] = field(default_factory=dict)
|
|
|
|
# ── Sensor self-check (per channel) ─────────────────────────────────────
|
|
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
|
|
|
|
# ── Monitor log + tooling version ───────────────────────────────────────
|
|
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
|
|
pc_sw_version: Optional[str] = None
|
|
|
|
# ── Sample table (optional; only parsed if requested) ───────────────────
|
|
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
|
|
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
|
|
samples: Optional[List[Tuple[float, float, float, float]]] = None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
_KEY_NORMALISE_RE = re.compile(r"\s+")
|
|
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
|
|
|
|
|
|
def _normalise_key(k: str) -> str:
|
|
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
|
|
"MicL Time of Peak" double-space and leading-colon quirks."""
|
|
return _KEY_NORMALISE_RE.sub(" ", k).strip()
|
|
|
|
|
|
def _strip_quotes(line: str) -> str:
|
|
line = line.rstrip("\r\n")
|
|
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
|
|
return line[1:-1]
|
|
return line
|
|
|
|
|
|
def _parse_number(value: str) -> Optional[float]:
|
|
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
|
|
m = _NUMERIC_RE.match(value.strip())
|
|
if not m:
|
|
return None
|
|
try:
|
|
return float(m.group(0))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_int(value: str) -> Optional[int]:
|
|
n = _parse_number(value)
|
|
return None if n is None else int(round(n))
|
|
|
|
|
|
# Months exactly as BW writes them.
|
|
_MONTHS = {
|
|
"January": 1, "February": 2, "March": 3, "April": 4,
|
|
"May": 5, "June": 6, "July": 7, "August": 8,
|
|
"September": 9, "October": 10, "November": 11, "December": 12,
|
|
# Short forms used in monitor-log rows ("Apr 23 /26").
|
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
|
|
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
|
|
}
|
|
|
|
|
|
def _parse_event_date(s: str) -> Optional[datetime.date]:
|
|
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
|
|
s = s.strip()
|
|
parts = s.replace(",", " ").split()
|
|
if len(parts) < 3:
|
|
return None
|
|
month_name, day_str, year_str = parts[0], parts[1], parts[2]
|
|
month = _MONTHS.get(month_name)
|
|
if month is None:
|
|
return None
|
|
try:
|
|
return datetime.date(int(year_str), month, int(day_str))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_iso_date(s: str) -> Optional[datetime.date]:
|
|
"""Parse "2026-05-16" → date. Histograms use ISO format for their
|
|
Start Date / Stop Date / Peak Date fields; waveforms use the
|
|
"May 8, 2026" long form which `_parse_event_date` handles."""
|
|
s = s.strip()
|
|
try:
|
|
return datetime.date.fromisoformat(s)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
_INTERVAL_UNIT_SECONDS = {
|
|
"second": 1, "seconds": 1, "sec": 1, "secs": 1,
|
|
"minute": 60, "minutes": 60, "min": 60, "mins": 60,
|
|
"hour": 3600, "hours": 3600, "hr": 3600, "hrs": 3600,
|
|
}
|
|
|
|
|
|
def _parse_interval_size(s: str) -> Optional[float]:
|
|
"""Parse "1 minute" / "5 minutes" / "15 seconds" / "2 seconds" → seconds.
|
|
|
|
Handles the BW Compliance Setup → Histogram Interval values verbatim
|
|
("2 seconds", "5 seconds", "15 seconds", "1 minute", "5 minutes",
|
|
"15 minutes") plus a few defensive variants.
|
|
"""
|
|
if not s:
|
|
return None
|
|
parts = s.strip().split()
|
|
if len(parts) < 2:
|
|
return None
|
|
try:
|
|
n = float(parts[0])
|
|
except ValueError:
|
|
return None
|
|
unit_per_s = _INTERVAL_UNIT_SECONDS.get(parts[1].lower())
|
|
if unit_per_s is None:
|
|
return None
|
|
return n * unit_per_s
|
|
|
|
|
|
def _parse_event_time(s: str) -> Optional[datetime.time]:
|
|
"""Parse "15:56:35" → time."""
|
|
s = s.strip()
|
|
try:
|
|
h, m, sec = s.split(":")
|
|
return datetime.time(int(h), int(m), int(sec))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
|
|
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
|
|
parts = value.split(" by ", 1)
|
|
date = _parse_event_date(parts[0])
|
|
by = parts[1].strip() if len(parts) > 1 else None
|
|
return date, by
|
|
|
|
|
|
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
|
|
"""Parse a tab-separated monitor log row.
|
|
|
|
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
|
|
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
|
|
Year is encoded as a 2-digit suffix; we expand "/26" → 2026.
|
|
"""
|
|
parts = line.split("\t")
|
|
if len(parts) < 2:
|
|
return None
|
|
start = _parse_monitor_ts(parts[0])
|
|
stop = _parse_monitor_ts(parts[1])
|
|
desc = parts[2].strip() if len(parts) > 2 else None
|
|
if start is None and stop is None and not desc:
|
|
return None
|
|
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
|
|
|
|
|
|
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
|
|
"""Parse "Apr 23 /26 15:46:16" → datetime."""
|
|
s = s.strip()
|
|
parts = s.split()
|
|
if len(parts) < 4:
|
|
return None
|
|
month = _MONTHS.get(parts[0])
|
|
if month is None:
|
|
return None
|
|
try:
|
|
day = int(parts[1])
|
|
# parts[2] looks like "/26" → century-flip to 2026
|
|
yy = int(parts[2].lstrip("/"))
|
|
year = 2000 + yy if yy < 80 else 1900 + yy
|
|
h, m, sec = (int(x) for x in parts[3].split(":"))
|
|
return datetime.datetime(year, month, day, h, m, sec)
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
# ── User-notes positional slot map ──────────────────────────────────────────
|
|
#
|
|
# Blastware's Compliance Setup → Notes tab shows four operator-supplied
|
|
# fields whose LABELS the operator can rename (see screenshot in
|
|
# project archive). Defaults are "Project:" / "Client:" /
|
|
# "User Name:" / "Seis Loc:", but an operator using a different
|
|
# convention can rename them to anything ("Building:", "Site:",
|
|
# "Address:", etc.). The ASCII export reflects whatever the operator
|
|
# typed, so label-based matching is fragile.
|
|
#
|
|
# What IS reliable: BW always writes the 4 user-notes lines in the
|
|
# same order, contiguously between the `Units :` line and the
|
|
# `Geo Range :` line. We parse them by POSITION and preserve the
|
|
# operator's labels in `report.user_note_labels` so terra-view can
|
|
# render them as the operator intended.
|
|
|
|
_USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location")
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Top-level parser
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
|
|
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
|
|
|
|
Set ``parse_samples=True`` to also populate ``report.samples`` with
|
|
the trailing sample table. Default False because the table is
|
|
huge and most callers only want metadata for indexing.
|
|
"""
|
|
if isinstance(text, bytes):
|
|
text = text.decode("ascii", errors="replace")
|
|
|
|
report = BwAsciiReport()
|
|
# Pre-create channel stat slots so callers can rely on them existing.
|
|
for ch in ("Tran", "Vert", "Long", "MicL"):
|
|
report.channels.setdefault(ch, ChannelStats())
|
|
report.sensor_check.setdefault(ch, SensorCheck())
|
|
|
|
lines = text.splitlines()
|
|
i = 0
|
|
n = len(lines)
|
|
|
|
in_monitor_log_section = False
|
|
event_time_str: Optional[str] = None
|
|
event_date: Optional[datetime.date] = None
|
|
|
|
# User-notes block detection. We enter the block after parsing
|
|
# the "Units :" line and exit on the "Geo Range :" line. Inside,
|
|
# the first 4 unmatched `<label> : <value>` lines are assigned to
|
|
# the 4 canonical operator-supplied slots by POSITION (project,
|
|
# client, operator, sensor_location) regardless of what the
|
|
# operator named the labels in BW's Compliance Setup → Notes tab.
|
|
in_user_notes_block = False
|
|
user_note_position = 0
|
|
|
|
# Histogram-field staging — BW writes <Channel> Peak Time and
|
|
# <Channel> Peak Date on separate lines (and similarly Histogram
|
|
# Start Time / Date). We stash the partial value when the time
|
|
# line arrives and combine it when the matching date line arrives.
|
|
_hist_start_time: Optional[datetime.time] = None
|
|
_hist_stop_time: Optional[datetime.time] = None
|
|
_pending_peak_time: Dict[str, Optional[datetime.time]] = {}
|
|
_pvs_time_raw: Optional[str] = None # last Peak Vector Sum Time value, raw
|
|
|
|
while i < n:
|
|
raw_line = lines[i]
|
|
i += 1
|
|
# Blank line marks the start of the sample table.
|
|
if raw_line.strip() == "":
|
|
break
|
|
|
|
line = _strip_quotes(raw_line)
|
|
|
|
# Monitor log section: "Monitor Log(s)" header followed by N rows
|
|
# (still inside double-quoted lines), terminated by a non-row line
|
|
# like "PC SW Version : ..." or a blank line.
|
|
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
|
|
in_monitor_log_section = True
|
|
continue
|
|
if in_monitor_log_section:
|
|
# Heuristic: monitor rows contain a tab; the next "Field : Value"
|
|
# line ends the section.
|
|
if "\t" in line:
|
|
entry = _parse_monitor_row(line)
|
|
if entry:
|
|
report.monitor_log.append(entry)
|
|
continue
|
|
# Falls through to the field parser below; clear the flag.
|
|
in_monitor_log_section = False
|
|
|
|
# "Field : Value" — split on FIRST occurrence of " : "
|
|
idx = line.find(" : ")
|
|
if idx < 0:
|
|
continue
|
|
key = _normalise_key(line[:idx])
|
|
value = line[idx + 3 :].strip()
|
|
|
|
# ── Identity / config ────────────────────────────────────────────────
|
|
if key == "Event Type": report.event_type = value
|
|
elif key == "Serial Number": report.serial = value
|
|
elif key == "Version": report.version = value
|
|
elif key == "File Name": report.file_name = value
|
|
elif key == "Event Time": event_time_str = value
|
|
elif key == "Event Date": event_date = _parse_event_date(value)
|
|
|
|
elif key == "Trigger": report.trigger_channel = value
|
|
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
|
|
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
|
|
elif key == "Record Time": report.record_time_s = _parse_number(value)
|
|
elif key == "Record Stop Mode": report.record_stop_mode = value
|
|
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
|
|
elif key == "Battery Level": report.battery_volts = _parse_number(value)
|
|
elif key == "Calibration":
|
|
report.calibration_date, report.calibration_by = _parse_calibration(value)
|
|
elif key == "Units":
|
|
report.units = value
|
|
# Entering the user-notes block. Next ~4 lines until
|
|
# "Geo Range :" are the operator-supplied notes.
|
|
in_user_notes_block = True
|
|
user_note_position = 0
|
|
|
|
elif key == "Geo Range":
|
|
# Exiting the user-notes block.
|
|
in_user_notes_block = False
|
|
report.geo_range_ips = _parse_number(value)
|
|
|
|
# User-notes block: assign by position (operator may have
|
|
# renamed the labels, so we don't trust them). Preserve the
|
|
# original labels in `user_note_labels` for downstream UIs
|
|
# (terra-view) that want to display them as the operator
|
|
# named them.
|
|
elif in_user_notes_block and user_note_position < len(_USER_NOTE_SLOTS):
|
|
slot = _USER_NOTE_SLOTS[user_note_position]
|
|
setattr(report, slot, value)
|
|
report.user_note_labels[slot] = key
|
|
user_note_position += 1
|
|
|
|
# ── Per-channel stats ────────────────────────────────────────────────
|
|
# All match the pattern "{Channel} <stat-name>"
|
|
elif key in (
|
|
"Tran PPV", "Vert PPV", "Long PPV",
|
|
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
|
|
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
|
|
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
|
|
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
|
|
):
|
|
ch_name, stat = key.split(" ", 1)
|
|
cs = report.channels.setdefault(ch_name, ChannelStats())
|
|
if stat == "PPV":
|
|
if _is_oorange(value):
|
|
# Channel saturated — substitute range max as lower
|
|
# bound; flag so downstream UI can render "> 10 in/s".
|
|
cs.ppv_ips = report.geo_range_ips
|
|
cs.ppv_saturated = True
|
|
else:
|
|
cs.ppv_ips = _parse_number(value)
|
|
elif stat == "ZC Freq":
|
|
# ">100 Hz" → store threshold + flag; numeric → parse normally
|
|
threshold = _parse_above_range(value)
|
|
if threshold is not None:
|
|
cs.zc_freq_hz = threshold
|
|
cs.zc_freq_above_range = True
|
|
else:
|
|
cs.zc_freq_hz = _parse_number(value)
|
|
else:
|
|
num = _parse_number(value)
|
|
if stat == "Time of Peak": cs.time_of_peak_s = num
|
|
elif stat == "Peak Acceleration": cs.peak_accel_g = num
|
|
elif stat == "Peak Displacement": cs.peak_disp_in = num
|
|
|
|
# ── Histogram-specific fields ────────────────────────────────────────
|
|
# Histograms have Start/Stop time+date pairs + an interval count
|
|
# and size, plus per-channel absolute Peak Time/Date instead of
|
|
# the waveform's relative Time of Peak.
|
|
elif key == "Histogram Start Time":
|
|
_hist_start_time = _parse_event_time(value)
|
|
elif key == "Histogram Start Date":
|
|
_d = _parse_iso_date(value)
|
|
if _d and _hist_start_time:
|
|
report.histogram_start = datetime.datetime.combine(_d, _hist_start_time)
|
|
elif key == "Histogram Stop Time":
|
|
_hist_stop_time = _parse_event_time(value)
|
|
elif key == "Histogram Stop Date":
|
|
_d = _parse_iso_date(value)
|
|
if _d and _hist_stop_time:
|
|
report.histogram_stop = datetime.datetime.combine(_d, _hist_stop_time)
|
|
elif key == "Number of Intervals":
|
|
try:
|
|
report.histogram_n_intervals = int(float(value.strip()))
|
|
except ValueError:
|
|
pass
|
|
elif key == "Interval Size":
|
|
report.histogram_interval_size_str = value.strip()
|
|
report.histogram_interval_size_s = _parse_interval_size(value)
|
|
|
|
# ── Per-channel histogram Peak Date / Peak Time ──
|
|
# Lines like "Tran Peak Time : 22:31:38" + "Tran Peak Date : 2026-05-16"
|
|
elif key in ("Tran Peak Time", "Vert Peak Time", "Long Peak Time", "MicL Time"):
|
|
ch_name = "MicL" if key == "MicL Time" else key.split(" ", 1)[0]
|
|
_pending_peak_time[ch_name] = _parse_event_time(value)
|
|
elif key in ("Tran Peak Date", "Vert Peak Date", "Long Peak Date", "MicL Date"):
|
|
ch_name = "MicL" if key == "MicL Date" else key.split(" ", 1)[0]
|
|
_d = _parse_iso_date(value)
|
|
_t = _pending_peak_time.get(ch_name)
|
|
if _d and _t:
|
|
report.channel_peak_when[ch_name] = datetime.datetime.combine(_d, _t)
|
|
|
|
# ── Vector Sum ───────────────────────────────────────────────────────
|
|
elif key == "Peak Vector Sum":
|
|
if _is_oorange(value):
|
|
# PVS saturated — conservative upper bound is
|
|
# sqrt(3) * geo_range_ips (all 3 channels at full-scale).
|
|
# Real PVS could be lower (channels rarely peak
|
|
# simultaneously) but never higher within the range.
|
|
if report.geo_range_ips is not None:
|
|
import math as _math
|
|
report.peak_vector_sum_ips = _math.sqrt(3) * report.geo_range_ips
|
|
report.peak_vector_sum_saturated = True
|
|
else:
|
|
report.peak_vector_sum_ips = _parse_number(value)
|
|
# BW writes the PVS-time label with a typo: "Peak Vector Sum TimeSum"
|
|
# (looks like Sum got appended twice). Accept both forms. Confirmed
|
|
# against actual BW output on 2026-05-27 — every PVS-time line in
|
|
# the field examples (T190, T438, K557) uses the typo'd label.
|
|
elif key in ("Peak Vector Sum Time", "Peak Vector Sum TimeSum"):
|
|
report.peak_vector_sum_time_s = _parse_number(value)
|
|
_pvs_time_raw = value
|
|
elif key == "Peak Vector Sum Date":
|
|
# Histogram-mode PVS gets paired with a date. We may have
|
|
# captured 'Peak Vector Sum Time' as either a relative
|
|
# seconds float (waveform) or an HH:MM:SS string we
|
|
# interpreted as a number. For histograms, BW writes
|
|
# "Peak Vector Sum Time : 22:33:52" which _parse_number
|
|
# parses as 22.0 (loses information). When Peak Vector Sum
|
|
# Date arrives, re-parse the previous PVS time line as a
|
|
# clock time and combine into an absolute datetime.
|
|
_d = _parse_iso_date(value)
|
|
if _d and _pvs_time_raw is not None:
|
|
_t = _parse_event_time(_pvs_time_raw)
|
|
if _t:
|
|
report.peak_vector_sum_when = datetime.datetime.combine(_d, _t)
|
|
# The earlier seconds parse was bogus for histograms;
|
|
# clear it so consumers don't think it's a real offset.
|
|
report.peak_vector_sum_time_s = None
|
|
|
|
# ── Microphone block ────────────────────────────────────────────────
|
|
elif key == "Microphone":
|
|
report.mic.weighting = value
|
|
elif key == "MicL PSPL":
|
|
if _is_oorange(value):
|
|
# Mic saturated — substitute conservative upper bound 140 dBL.
|
|
report.mic.pspl_dbl = 140.0
|
|
report.mic.pspl_saturated = True
|
|
else:
|
|
report.mic.pspl_dbl = _parse_number(value)
|
|
# Mirror onto the "MicL" entry in channels so callers querying
|
|
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
|
|
# in/s, so we store as-is in the MicStats and mark the channel.
|
|
elif key == "MicL Time of Peak":
|
|
report.mic.time_of_peak_s = _parse_number(value)
|
|
cs = report.channels.setdefault("MicL", ChannelStats())
|
|
cs.time_of_peak_s = report.mic.time_of_peak_s
|
|
elif key == "MicL ZC Freq":
|
|
threshold = _parse_above_range(value)
|
|
if threshold is not None:
|
|
report.mic.zc_freq_hz = threshold
|
|
report.mic.zc_freq_above_range = True
|
|
else:
|
|
report.mic.zc_freq_hz = _parse_number(value)
|
|
cs = report.channels.setdefault("MicL", ChannelStats())
|
|
cs.zc_freq_hz = report.mic.zc_freq_hz
|
|
cs.zc_freq_above_range = report.mic.zc_freq_above_range
|
|
|
|
# ── Sensor self-check ────────────────────────────────────────────────
|
|
elif key in (
|
|
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
|
|
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
|
|
"MicL Test Amplitude",
|
|
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
|
|
):
|
|
ch_name, stat = key.split(" ", 1)
|
|
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
|
|
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
|
|
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
|
|
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
|
|
elif stat == "Test Results": sc.test_results = value
|
|
|
|
# ── Trailer ─────────────────────────────────────────────────────────
|
|
elif key == "PC SW Version":
|
|
report.pc_sw_version = value
|
|
|
|
# Unknown keys are silently dropped — forward-compat for future
|
|
# BW versions that may add fields.
|
|
|
|
# Combine event date + time into a datetime
|
|
if event_date is not None and event_time_str is not None:
|
|
t = _parse_event_time(event_time_str)
|
|
if t is not None:
|
|
report.event_datetime = datetime.datetime.combine(event_date, t)
|
|
|
|
if parse_samples:
|
|
report.samples = _parse_sample_table(lines, i)
|
|
|
|
return report
|
|
|
|
|
|
def _parse_sample_table(
|
|
lines: List[str], start: int,
|
|
) -> List[Tuple[float, float, float, float]]:
|
|
"""Parse the trailing sample table.
|
|
|
|
The table starts with a header row (" Tran <TAB>...") and continues
|
|
until EOF. Each data row is a tab-separated quartet of numeric values.
|
|
"""
|
|
samples: List[Tuple[float, float, float, float]] = []
|
|
seen_header = False
|
|
for line in lines[start:]:
|
|
line = line.rstrip("\r\n")
|
|
if not line.strip():
|
|
continue
|
|
cols = [c.strip() for c in line.split("\t") if c.strip()]
|
|
if not seen_header:
|
|
# Header row contains channel names; numeric rows don't.
|
|
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
|
|
seen_header = True
|
|
continue
|
|
if len(cols) < 4:
|
|
continue
|
|
try:
|
|
samples.append((
|
|
float(cols[0]), float(cols[1]),
|
|
float(cols[2]), float(cols[3]),
|
|
))
|
|
except ValueError:
|
|
continue
|
|
return samples
|
|
|
|
|
|
def parse_report_file(
|
|
path: Union[str, Path], *, parse_samples: bool = False,
|
|
) -> BwAsciiReport:
|
|
"""Convenience: read a .TXT file from disk and parse it."""
|
|
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)
|