Files
seismo-relay/minimateplus/bw_ascii_report.py
T
serversdown d21e3b5298 histogram aggregation + parser extension for BW interval fields
Three layered changes that together make histogram charts visually
match BW's printout (one bar per interval, not per codec block):

1. bw_ascii_report parser captures histogram fields it previously
   dropped:
     - Histogram Start/Stop Time + Date → datetime
     - Number of Intervals + Interval Size (string + parsed seconds)
     - <Channel> Peak Time + Peak Date → datetime (per-channel)
     - Peak Vector Sum Date (combined with PVS Time → datetime;
       clears the bogus seconds parse that interpreted "22:33:52"
       as 22.0)
   New _parse_iso_date() handles BW's ISO format for histograms
   (waveforms use "May 8, 2026" long form).  New _parse_interval_size()
   handles "1 minute" / "5 minutes" / "15 seconds" etc.

2. _bw_report_to_dict() projects the new fields into a new
   bw_report.histogram block in the sidecar.

3. /db/events/{id}/waveform.json wraps the existing path 1 (HDF5)
   output with _maybe_aggregate_histogram(): when the event is a
   histogram AND the sidecar has bw_report.histogram.n_intervals,
   group the codec's per-block samples into N intervals via
   max-per-group and return the aggregated array.  time_axis gains
   histogram_aggregated / n_intervals / interval_size_s / interval_times
   fields.

Frontend (both modal chart in sfm_webapp.html + standalone event
browser) uses interval_times as x-axis labels when provided (BW-style
HH:MM:SS), falls back to interval index.

Defensive: aggregation is no-op when the sidecar lacks the histogram
block (events ingested before this change).  Activates automatically
on prod once a watcher re-forward populates new sidecars.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 20:23:05 +00:00

644 lines
30 KiB
Python

"""
minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII
report (the .TXT file BW writes alongside each saved event binary).
The ASCII export is the authoritative source for every "rich" per-event
field that BW computes from the waveform but never persists in the BW
binary itself:
- Per-channel PPV (Tran / Vert / Long / MicL)
- Peak Vector Sum + Peak Vector Sum Time
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
- MicL Test Amplitude (mV)
- Battery, calibration date, monitor-log timestamps
Persisting these values into the SFM database lets the monthly-summary
review workflow ("show me events at Location X with PVS > 0.5") work
without depending on the (still-undecoded) waveform body codec.
Format (verified against decode-re/5-8-26 4-event bundle):
- One field per line, wrapped in double quotes: `"Field Name : Value"`
- Field/value separator: literal ` : ` (space-colon-space).
- Some field names contain an internal `:` already (e.g. `"Project:"`),
so we split on the FIRST ` : ` only.
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
of `start_time<TAB>stop_time<TAB>description`.
- Final `"PC SW Version : ..."` line ends the metadata block.
- A blank line separates metadata from the sample table.
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
one row per sample (tab-separated, right-padded numeric values).
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
Because some metadata fields have whitespace quirks ("MicL Time of
Peak" has two spaces; the leading "Project:" value has its own colon),
we normalise whitespace in the key before lookup.
"""
from __future__ import annotations
import datetime
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# ─────────────────────────────────────────────────────────────────────────────
# Output dataclasses
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ChannelStats:
"""Per-channel derived stats, populated from an event report."""
ppv_ips: Optional[float] = None # in/s (geo channels only)
zc_freq_hz: Optional[float] = None # Hz
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
peak_accel_g: Optional[float] = None # g (geo channels only)
peak_disp_in: Optional[float] = None # in (geo channels only)
@dataclass
class MicStats:
"""MicL-specific stats."""
weighting: Optional[str] = None # e.g. "Linear Weighting"
pspl_dbl: Optional[float] = None # dB(L)
zc_freq_hz: Optional[float] = None
time_of_peak_s: Optional[float] = None
@dataclass
class SensorCheck:
"""Per-channel sensor self-check result.
Geo channels report a frequency + ratio; MicL reports a frequency +
amplitude (mV). All channels also have a Pass/Fail string.
"""
test_freq_hz: Optional[float] = None
test_ratio: Optional[float] = None # geo channels only
test_amplitude_mv: Optional[float] = None # MicL only
test_results: Optional[str] = None # "Passed" / "Failed"
@dataclass
class MonitorLogEntry:
"""One row of the trailing Monitor Log(s) block."""
start_time: Optional[datetime.datetime] = None
stop_time: Optional[datetime.datetime] = None
description: Optional[str] = None
@dataclass
class BwAsciiReport:
"""Structured representation of one BW per-event ASCII export."""
# ── Identity ─────────────────────────────────────────────────────────────
event_type: Optional[str] = None # e.g. "Full Waveform"
serial: Optional[str] = None # e.g. "BE11529"
version: Optional[str] = None # firmware version line
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
# ── Trigger / recording config ──────────────────────────────────────────
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
geo_trigger_level_ips: Optional[float] = None
pretrig_s: Optional[float] = None # negative seconds
record_time_s: Optional[float] = None
record_stop_mode: Optional[str] = None
sample_rate_sps: Optional[int] = None
battery_volts: Optional[float] = None
calibration_date: Optional[datetime.date] = None
calibration_by: Optional[str] = None # e.g. "Instantel"
units: Optional[str] = None # e.g. "in/s and dB(L)"
# ── Operator-supplied metadata ──────────────────────────────────────────
# Parsed by POSITION from the 4-line "User Notes" block BW writes
# between the `Units :` and `Geo Range :` lines. Position-based so
# the values populate correctly even when an operator renames the
# labels in Blastware's Compliance Setup → Notes tab (the 4 labels
# are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:").
# The original labels BW wrote are preserved in `user_note_labels`
# so terra-view can render them as the operator named them.
project: Optional[str] = None # position 1 (BW default label "Project:")
client: Optional[str] = None # position 2 (BW default label "Client:")
operator: Optional[str] = None # position 3 (BW default label "User Name:")
sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:")
# Maps canonical slot name → the literal label BW wrote in the ASCII
# export. Empty if the User Notes block wasn't present. Example
# when the operator renamed slot 4 to "Building:":
# {"project": "Project:", "client": "Client:",
# "operator": "User Name:", "sensor_location": "Building:"}
user_note_labels: Dict[str, str] = field(default_factory=dict)
# ── Geo channel scaling ─────────────────────────────────────────────────
geo_range_ips: Optional[float] = None # 10.000 / 1.250
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
channels: Dict[str, ChannelStats] = field(default_factory=dict)
mic: MicStats = field(default_factory=MicStats)
# ── Vector sum ──────────────────────────────────────────────────────────
peak_vector_sum_ips: Optional[float] = None
peak_vector_sum_time_s: Optional[float] = None
# Histograms additionally have an absolute date+time for the PVS
# (it occurred at a specific interval). Waveform reports show
# only the relative-time value above.
peak_vector_sum_when: Optional[datetime.datetime] = None
# ── Histogram-specific fields (populated only when Event Type starts
# with 'Histogram' / 'Full Histogram' / 'Histogram + Continuous') ──
histogram_start: Optional[datetime.datetime] = None
histogram_stop: Optional[datetime.datetime] = None
histogram_n_intervals: Optional[int] = None # e.g. 4, 1436
histogram_interval_size_str: Optional[str] = None # "1 minute" / "5 minutes" / "15 seconds"
histogram_interval_size_s: Optional[float] = None # parsed to seconds
# Per-channel absolute peak time+date (histogram-specific). For
# waveform events these are None — those reports use the channel's
# time_of_peak_s (relative to trigger) instead. Keyed by channel
# name ("Tran", "Vert", "Long", "MicL").
channel_peak_when: Dict[str, datetime.datetime] = field(default_factory=dict)
# ── Sensor self-check (per channel) ─────────────────────────────────────
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
# ── Monitor log + tooling version ───────────────────────────────────────
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
pc_sw_version: Optional[str] = None
# ── Sample table (optional; only parsed if requested) ───────────────────
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
samples: Optional[List[Tuple[float, float, float, float]]] = None
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
_KEY_NORMALISE_RE = re.compile(r"\s+")
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
def _normalise_key(k: str) -> str:
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
"MicL Time of Peak" double-space and leading-colon quirks."""
return _KEY_NORMALISE_RE.sub(" ", k).strip()
def _strip_quotes(line: str) -> str:
line = line.rstrip("\r\n")
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
return line[1:-1]
return line
def _parse_number(value: str) -> Optional[float]:
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
m = _NUMERIC_RE.match(value.strip())
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
def _parse_int(value: str) -> Optional[int]:
n = _parse_number(value)
return None if n is None else int(round(n))
# Months exactly as BW writes them.
_MONTHS = {
"January": 1, "February": 2, "March": 3, "April": 4,
"May": 5, "June": 6, "July": 7, "August": 8,
"September": 9, "October": 10, "November": 11, "December": 12,
# Short forms used in monitor-log rows ("Apr 23 /26").
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
def _parse_event_date(s: str) -> Optional[datetime.date]:
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
s = s.strip()
parts = s.replace(",", " ").split()
if len(parts) < 3:
return None
month_name, day_str, year_str = parts[0], parts[1], parts[2]
month = _MONTHS.get(month_name)
if month is None:
return None
try:
return datetime.date(int(year_str), month, int(day_str))
except ValueError:
return None
def _parse_iso_date(s: str) -> Optional[datetime.date]:
"""Parse "2026-05-16" → date. Histograms use ISO format for their
Start Date / Stop Date / Peak Date fields; waveforms use the
"May 8, 2026" long form which `_parse_event_date` handles."""
s = s.strip()
try:
return datetime.date.fromisoformat(s)
except ValueError:
return None
_INTERVAL_UNIT_SECONDS = {
"second": 1, "seconds": 1, "sec": 1, "secs": 1,
"minute": 60, "minutes": 60, "min": 60, "mins": 60,
"hour": 3600, "hours": 3600, "hr": 3600, "hrs": 3600,
}
def _parse_interval_size(s: str) -> Optional[float]:
"""Parse "1 minute" / "5 minutes" / "15 seconds" / "2 seconds" → seconds.
Handles the BW Compliance Setup → Histogram Interval values verbatim
("2 seconds", "5 seconds", "15 seconds", "1 minute", "5 minutes",
"15 minutes") plus a few defensive variants.
"""
if not s:
return None
parts = s.strip().split()
if len(parts) < 2:
return None
try:
n = float(parts[0])
except ValueError:
return None
unit_per_s = _INTERVAL_UNIT_SECONDS.get(parts[1].lower())
if unit_per_s is None:
return None
return n * unit_per_s
def _parse_event_time(s: str) -> Optional[datetime.time]:
"""Parse "15:56:35" → time."""
s = s.strip()
try:
h, m, sec = s.split(":")
return datetime.time(int(h), int(m), int(sec))
except (ValueError, IndexError):
return None
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
parts = value.split(" by ", 1)
date = _parse_event_date(parts[0])
by = parts[1].strip() if len(parts) > 1 else None
return date, by
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
"""Parse a tab-separated monitor log row.
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
Year is encoded as a 2-digit suffix; we expand "/26" → 2026.
"""
parts = line.split("\t")
if len(parts) < 2:
return None
start = _parse_monitor_ts(parts[0])
stop = _parse_monitor_ts(parts[1])
desc = parts[2].strip() if len(parts) > 2 else None
if start is None and stop is None and not desc:
return None
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
"""Parse "Apr 23 /26 15:46:16" → datetime."""
s = s.strip()
parts = s.split()
if len(parts) < 4:
return None
month = _MONTHS.get(parts[0])
if month is None:
return None
try:
day = int(parts[1])
# parts[2] looks like "/26" → century-flip to 2026
yy = int(parts[2].lstrip("/"))
year = 2000 + yy if yy < 80 else 1900 + yy
h, m, sec = (int(x) for x in parts[3].split(":"))
return datetime.datetime(year, month, day, h, m, sec)
except (ValueError, IndexError):
return None
# ── User-notes positional slot map ──────────────────────────────────────────
#
# Blastware's Compliance Setup → Notes tab shows four operator-supplied
# fields whose LABELS the operator can rename (see screenshot in
# project archive). Defaults are "Project:" / "Client:" /
# "User Name:" / "Seis Loc:", but an operator using a different
# convention can rename them to anything ("Building:", "Site:",
# "Address:", etc.). The ASCII export reflects whatever the operator
# typed, so label-based matching is fragile.
#
# What IS reliable: BW always writes the 4 user-notes lines in the
# same order, contiguously between the `Units :` line and the
# `Geo Range :` line. We parse them by POSITION and preserve the
# operator's labels in `report.user_note_labels` so terra-view can
# render them as the operator intended.
_USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location")
# ─────────────────────────────────────────────────────────────────────────────
# Top-level parser
# ─────────────────────────────────────────────────────────────────────────────
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
Set ``parse_samples=True`` to also populate ``report.samples`` with
the trailing sample table. Default False because the table is
huge and most callers only want metadata for indexing.
"""
if isinstance(text, bytes):
text = text.decode("ascii", errors="replace")
report = BwAsciiReport()
# Pre-create channel stat slots so callers can rely on them existing.
for ch in ("Tran", "Vert", "Long", "MicL"):
report.channels.setdefault(ch, ChannelStats())
report.sensor_check.setdefault(ch, SensorCheck())
lines = text.splitlines()
i = 0
n = len(lines)
in_monitor_log_section = False
event_time_str: Optional[str] = None
event_date: Optional[datetime.date] = None
# User-notes block detection. We enter the block after parsing
# the "Units :" line and exit on the "Geo Range :" line. Inside,
# the first 4 unmatched `<label> : <value>` lines are assigned to
# the 4 canonical operator-supplied slots by POSITION (project,
# client, operator, sensor_location) regardless of what the
# operator named the labels in BW's Compliance Setup → Notes tab.
in_user_notes_block = False
user_note_position = 0
# Histogram-field staging — BW writes <Channel> Peak Time and
# <Channel> Peak Date on separate lines (and similarly Histogram
# Start Time / Date). We stash the partial value when the time
# line arrives and combine it when the matching date line arrives.
_hist_start_time: Optional[datetime.time] = None
_hist_stop_time: Optional[datetime.time] = None
_pending_peak_time: Dict[str, Optional[datetime.time]] = {}
_pvs_time_raw: Optional[str] = None # last Peak Vector Sum Time value, raw
while i < n:
raw_line = lines[i]
i += 1
# Blank line marks the start of the sample table.
if raw_line.strip() == "":
break
line = _strip_quotes(raw_line)
# Monitor log section: "Monitor Log(s)" header followed by N rows
# (still inside double-quoted lines), terminated by a non-row line
# like "PC SW Version : ..." or a blank line.
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
in_monitor_log_section = True
continue
if in_monitor_log_section:
# Heuristic: monitor rows contain a tab; the next "Field : Value"
# line ends the section.
if "\t" in line:
entry = _parse_monitor_row(line)
if entry:
report.monitor_log.append(entry)
continue
# Falls through to the field parser below; clear the flag.
in_monitor_log_section = False
# "Field : Value" — split on FIRST occurrence of " : "
idx = line.find(" : ")
if idx < 0:
continue
key = _normalise_key(line[:idx])
value = line[idx + 3 :].strip()
# ── Identity / config ────────────────────────────────────────────────
if key == "Event Type": report.event_type = value
elif key == "Serial Number": report.serial = value
elif key == "Version": report.version = value
elif key == "File Name": report.file_name = value
elif key == "Event Time": event_time_str = value
elif key == "Event Date": event_date = _parse_event_date(value)
elif key == "Trigger": report.trigger_channel = value
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
elif key == "Record Time": report.record_time_s = _parse_number(value)
elif key == "Record Stop Mode": report.record_stop_mode = value
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
elif key == "Battery Level": report.battery_volts = _parse_number(value)
elif key == "Calibration":
report.calibration_date, report.calibration_by = _parse_calibration(value)
elif key == "Units":
report.units = value
# Entering the user-notes block. Next ~4 lines until
# "Geo Range :" are the operator-supplied notes.
in_user_notes_block = True
user_note_position = 0
elif key == "Geo Range":
# Exiting the user-notes block.
in_user_notes_block = False
report.geo_range_ips = _parse_number(value)
# User-notes block: assign by position (operator may have
# renamed the labels, so we don't trust them). Preserve the
# original labels in `user_note_labels` for downstream UIs
# (terra-view) that want to display them as the operator
# named them.
elif in_user_notes_block and user_note_position < len(_USER_NOTE_SLOTS):
slot = _USER_NOTE_SLOTS[user_note_position]
setattr(report, slot, value)
report.user_note_labels[slot] = key
user_note_position += 1
# ── Per-channel stats ────────────────────────────────────────────────
# All match the pattern "{Channel} <stat-name>"
elif key in (
"Tran PPV", "Vert PPV", "Long PPV",
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
):
ch_name, stat = key.split(" ", 1)
cs = report.channels.setdefault(ch_name, ChannelStats())
num = _parse_number(value)
if stat == "PPV": cs.ppv_ips = num
elif stat == "ZC Freq": cs.zc_freq_hz = num
elif stat == "Time of Peak": cs.time_of_peak_s = num
elif stat == "Peak Acceleration": cs.peak_accel_g = num
elif stat == "Peak Displacement": cs.peak_disp_in = num
# ── Histogram-specific fields ────────────────────────────────────────
# Histograms have Start/Stop time+date pairs + an interval count
# and size, plus per-channel absolute Peak Time/Date instead of
# the waveform's relative Time of Peak.
elif key == "Histogram Start Time":
_hist_start_time = _parse_event_time(value)
elif key == "Histogram Start Date":
_d = _parse_iso_date(value)
if _d and _hist_start_time:
report.histogram_start = datetime.datetime.combine(_d, _hist_start_time)
elif key == "Histogram Stop Time":
_hist_stop_time = _parse_event_time(value)
elif key == "Histogram Stop Date":
_d = _parse_iso_date(value)
if _d and _hist_stop_time:
report.histogram_stop = datetime.datetime.combine(_d, _hist_stop_time)
elif key == "Number of Intervals":
try:
report.histogram_n_intervals = int(float(value.strip()))
except ValueError:
pass
elif key == "Interval Size":
report.histogram_interval_size_str = value.strip()
report.histogram_interval_size_s = _parse_interval_size(value)
# ── Per-channel histogram Peak Date / Peak Time ──
# Lines like "Tran Peak Time : 22:31:38" + "Tran Peak Date : 2026-05-16"
elif key in ("Tran Peak Time", "Vert Peak Time", "Long Peak Time", "MicL Time"):
ch_name = "MicL" if key == "MicL Time" else key.split(" ", 1)[0]
_pending_peak_time[ch_name] = _parse_event_time(value)
elif key in ("Tran Peak Date", "Vert Peak Date", "Long Peak Date", "MicL Date"):
ch_name = "MicL" if key == "MicL Date" else key.split(" ", 1)[0]
_d = _parse_iso_date(value)
_t = _pending_peak_time.get(ch_name)
if _d and _t:
report.channel_peak_when[ch_name] = datetime.datetime.combine(_d, _t)
# ── Vector Sum ───────────────────────────────────────────────────────
elif key == "Peak Vector Sum":
report.peak_vector_sum_ips = _parse_number(value)
elif key == "Peak Vector Sum Time":
report.peak_vector_sum_time_s = _parse_number(value)
_pvs_time_raw = value
elif key == "Peak Vector Sum Date":
# Histogram-mode PVS gets paired with a date. We may have
# captured 'Peak Vector Sum Time' as either a relative
# seconds float (waveform) or an HH:MM:SS string we
# interpreted as a number. For histograms, BW writes
# "Peak Vector Sum Time : 22:33:52" which _parse_number
# parses as 22.0 (loses information). When Peak Vector Sum
# Date arrives, re-parse the previous PVS time line as a
# clock time and combine into an absolute datetime.
_d = _parse_iso_date(value)
if _d and _pvs_time_raw is not None:
_t = _parse_event_time(_pvs_time_raw)
if _t:
report.peak_vector_sum_when = datetime.datetime.combine(_d, _t)
# The earlier seconds parse was bogus for histograms;
# clear it so consumers don't think it's a real offset.
report.peak_vector_sum_time_s = None
# ── Microphone block ────────────────────────────────────────────────
elif key == "Microphone":
report.mic.weighting = value
elif key == "MicL PSPL":
report.mic.pspl_dbl = _parse_number(value)
# Mirror onto the "MicL" entry in channels so callers querying
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
# in/s, so we store as-is in the MicStats and mark the channel.
elif key == "MicL Time of Peak":
report.mic.time_of_peak_s = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.time_of_peak_s = report.mic.time_of_peak_s
elif key == "MicL ZC Freq":
report.mic.zc_freq_hz = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.zc_freq_hz = report.mic.zc_freq_hz
# ── Sensor self-check ────────────────────────────────────────────────
elif key in (
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
"MicL Test Amplitude",
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
):
ch_name, stat = key.split(" ", 1)
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
elif stat == "Test Results": sc.test_results = value
# ── Trailer ─────────────────────────────────────────────────────────
elif key == "PC SW Version":
report.pc_sw_version = value
# Unknown keys are silently dropped — forward-compat for future
# BW versions that may add fields.
# Combine event date + time into a datetime
if event_date is not None and event_time_str is not None:
t = _parse_event_time(event_time_str)
if t is not None:
report.event_datetime = datetime.datetime.combine(event_date, t)
if parse_samples:
report.samples = _parse_sample_table(lines, i)
return report
def _parse_sample_table(
lines: List[str], start: int,
) -> List[Tuple[float, float, float, float]]:
"""Parse the trailing sample table.
The table starts with a header row (" Tran <TAB>...") and continues
until EOF. Each data row is a tab-separated quartet of numeric values.
"""
samples: List[Tuple[float, float, float, float]] = []
seen_header = False
for line in lines[start:]:
line = line.rstrip("\r\n")
if not line.strip():
continue
cols = [c.strip() for c in line.split("\t") if c.strip()]
if not seen_header:
# Header row contains channel names; numeric rows don't.
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
seen_header = True
continue
if len(cols) < 4:
continue
try:
samples.append((
float(cols[0]), float(cols[1]),
float(cols[2]), float(cols[3]),
))
except ValueError:
continue
return samples
def parse_report_file(
path: Union[str, Path], *, parse_samples: bool = False,
) -> BwAsciiReport:
"""Convenience: read a .TXT file from disk and parse it."""
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)