""" minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII report (the .TXT file BW writes alongside each saved event binary). The ASCII export is the authoritative source for every "rich" per-event field that BW computes from the waveform but never persists in the BW binary itself: - Per-channel PPV (Tran / Vert / Long / MicL) - Peak Vector Sum + Peak Vector Sum Time - Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement - MicL PSPL, MicL Time of Peak, MicL ZC Freq - Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results) - MicL Test Amplitude (mV) - Battery, calibration date, monitor-log timestamps Persisting these values into the SFM database lets the monthly-summary review workflow ("show me events at Location X with PVS > 0.5") work without depending on the (still-undecoded) waveform body codec. Format (verified against decode-re/5-8-26 4-event bundle): - One field per line, wrapped in double quotes: `"Field Name : Value"` - Field/value separator: literal ` : ` (space-colon-space). - Some field names contain an internal `:` already (e.g. `"Project:"`), so we split on the FIRST ` : ` only. - Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`. - A `"Monitor Log(s)"` marker line is followed by tab-separated rows of `start_timestop_timedescription`. - Final `"PC SW Version : ..."` line ends the metadata block. - A blank line separates metadata from the sample table. - Sample table starts with ` Tran Vert ...`, then one row per sample (tab-separated, right-padded numeric values). - Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold). Because some metadata fields have whitespace quirks ("MicL Time of Peak" has two spaces; the leading "Project:" value has its own colon), we normalise whitespace in the key before lookup. """ from __future__ import annotations import datetime import re from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple, Union # ───────────────────────────────────────────────────────────────────────────── # Output dataclasses # ───────────────────────────────────────────────────────────────────────────── @dataclass class ChannelStats: """Per-channel derived stats, populated from an event report.""" ppv_ips: Optional[float] = None # in/s (geo channels only) zc_freq_hz: Optional[float] = None # Hz time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative) peak_accel_g: Optional[float] = None # g (geo channels only) peak_disp_in: Optional[float] = None # in (geo channels only) # When BW writes "OORANGE" (Out Of Range — truncated) for a PPV # value, the true peak exceeded the channel's full-scale range. # We substitute the range max (e.g. 10.000 in/s for Normal range) # as a lower bound, and flag here so downstream UI / alerts know # to render "> 10 in/s" or "saturated" instead of trusting the # value as an exact measurement. ppv_saturated: bool = False # Set when BW writes ">100 Hz" for ZC Freq — the zero-crossing # algorithm's peak frequency exceeded the device's reporting # ceiling (typically 100 Hz on V10.72). zc_freq_hz gets the # threshold (100.0) as a lower bound; downstream UI renders ">100". zc_freq_above_range: bool = False @dataclass class MicStats: """MicL-specific stats.""" weighting: Optional[str] = None # e.g. "Linear Weighting" pspl_dbl: Optional[float] = None # dB(L) zc_freq_hz: Optional[float] = None time_of_peak_s: Optional[float] = None # Set when BW writes "OORANGE" for PSPL — mic exceeded its # measurement range. pspl_dbl gets the conservative upper bound # 140 dBL (typical NL-43 max; some units cap at 148). Consumers # should render "> 140 dB(L)" or similar when this flag is set. pspl_saturated: bool = False # Same semantics as ChannelStats.zc_freq_above_range — mic ZC # peak exceeded device reporting ceiling. zc_freq_above_range: bool = False @dataclass class SensorCheck: """Per-channel sensor self-check result. Geo channels report a frequency + ratio; MicL reports a frequency + amplitude (mV). All channels also have a Pass/Fail string. """ test_freq_hz: Optional[float] = None test_ratio: Optional[float] = None # geo channels only test_amplitude_mv: Optional[float] = None # MicL only test_results: Optional[str] = None # "Passed" / "Failed" @dataclass class MonitorLogEntry: """One row of the trailing Monitor Log(s) block.""" start_time: Optional[datetime.datetime] = None stop_time: Optional[datetime.datetime] = None description: Optional[str] = None # BW saturation marker — appears in PPV / Peak Vector Sum / similar # numeric fields when the underlying measurement exceeded the # channel's full-scale range (e.g., a geophone reading > 10 in/s at # Normal range, or a mic exceeding its sensitivity ceiling). Treated # as "≥ range_max" + a saturated flag rather than discarded. # Appears as: ``"Tran PPV : OORANGE in/s"`` _OORANGE_MARKERS = ("OORANGE", "OUT OF RANGE") def _is_oorange(value: str) -> bool: """True when a BW numeric field is an Out-Of-Range saturation marker.""" s = value.strip().upper() return any(m in s for m in _OORANGE_MARKERS) def _parse_above_range(value: str) -> Optional[float]: """For BW "above-range" markers like ">100 Hz", return the threshold. BW writes ZC Freq as ">100 Hz" when the zero-crossing algorithm sees a peak too fast to count (device cuts off at 100 Hz). Returns the numeric portion after the '>' (e.g. 100.0), or None if `value` is not an above-range marker. """ s = value.strip() if not s.startswith(">"): return None return _parse_number(s[1:]) @dataclass class BwAsciiReport: """Structured representation of one BW per-event ASCII export.""" # ── Identity ───────────────────────────────────────────────────────────── event_type: Optional[str] = None # e.g. "Full Waveform" serial: Optional[str] = None # e.g. "BE11529" version: Optional[str] = None # firmware version line file_name: Optional[str] = None # e.g. "M529LK44.AB0" event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date # ── Trigger / recording config ────────────────────────────────────────── trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit" geo_trigger_level_ips: Optional[float] = None pretrig_s: Optional[float] = None # negative seconds record_time_s: Optional[float] = None record_stop_mode: Optional[str] = None sample_rate_sps: Optional[int] = None battery_volts: Optional[float] = None calibration_date: Optional[datetime.date] = None calibration_by: Optional[str] = None # e.g. "Instantel" units: Optional[str] = None # e.g. "in/s and dB(L)" # ── Operator-supplied metadata ────────────────────────────────────────── # Parsed by POSITION from the 4-line "User Notes" block BW writes # between the `Units :` and `Geo Range :` lines. Position-based so # the values populate correctly even when an operator renames the # labels in Blastware's Compliance Setup → Notes tab (the 4 labels # are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:"). # The original labels BW wrote are preserved in `user_note_labels` # so terra-view can render them as the operator named them. project: Optional[str] = None # position 1 (BW default label "Project:") client: Optional[str] = None # position 2 (BW default label "Client:") operator: Optional[str] = None # position 3 (BW default label "User Name:") sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:") # Maps canonical slot name → the literal label BW wrote in the ASCII # export. Empty if the User Notes block wasn't present. Example # when the operator renamed slot 4 to "Building:": # {"project": "Project:", "client": "Client:", # "operator": "User Name:", "sensor_location": "Building:"} user_note_labels: Dict[str, str] = field(default_factory=dict) # ── Geo channel scaling ───────────────────────────────────────────────── geo_range_ips: Optional[float] = None # 10.000 / 1.250 # ── Per-channel derived stats (geo + mic) ─────────────────────────────── channels: Dict[str, ChannelStats] = field(default_factory=dict) mic: MicStats = field(default_factory=MicStats) # ── Vector sum ────────────────────────────────────────────────────────── peak_vector_sum_ips: Optional[float] = None peak_vector_sum_time_s: Optional[float] = None # Saturation flag — set when BW writes "OORANGE" for the PVS. We # then substitute sqrt(3) * geo_range_ips as a conservative upper # bound (the theoretical maximum PVS when all 3 geo channels are # simultaneously at full-scale). Consumers should display this as # ">{value} in/s" or similar. peak_vector_sum_saturated: bool = False # Histograms additionally have an absolute date+time for the PVS # (it occurred at a specific interval). Waveform reports show # only the relative-time value above. peak_vector_sum_when: Optional[datetime.datetime] = None # ── Histogram-specific fields (populated only when Event Type starts # with 'Histogram' / 'Full Histogram' / 'Histogram + Continuous') ── histogram_start: Optional[datetime.datetime] = None histogram_stop: Optional[datetime.datetime] = None histogram_n_intervals: Optional[int] = None # e.g. 4, 1436 histogram_interval_size_str: Optional[str] = None # "1 minute" / "5 minutes" / "15 seconds" histogram_interval_size_s: Optional[float] = None # parsed to seconds # Per-channel absolute peak time+date (histogram-specific). For # waveform events these are None — those reports use the channel's # time_of_peak_s (relative to trigger) instead. Keyed by channel # name ("Tran", "Vert", "Long", "MicL"). channel_peak_when: Dict[str, datetime.datetime] = field(default_factory=dict) # ── Sensor self-check (per channel) ───────────────────────────────────── sensor_check: Dict[str, SensorCheck] = field(default_factory=dict) # ── Monitor log + tooling version ─────────────────────────────────────── monitor_log: List[MonitorLogEntry] = field(default_factory=list) pc_sw_version: Optional[str] = None # ── Sample table (optional; only parsed if requested) ─────────────────── # Each entry: (Tran, Vert, Long, MicL) in the report's units (geo # channels in in/s, MicL in dB(L)). None when parse_samples=False. samples: Optional[List[Tuple[float, float, float, float]]] = None # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── _KEY_NORMALISE_RE = re.compile(r"\s+") _NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?") def _normalise_key(k: str) -> str: """Collapse whitespace runs (incl. tabs) and strip — handles BW's "MicL Time of Peak" double-space and leading-colon quirks.""" return _KEY_NORMALISE_RE.sub(" ", k).strip() def _strip_quotes(line: str) -> str: line = line.rstrip("\r\n") if len(line) >= 2 and line.startswith('"') and line.endswith('"'): return line[1:-1] return line def _parse_number(value: str) -> Optional[float]: """Pull the leading numeric portion out of a value like "0.500 in/s".""" m = _NUMERIC_RE.match(value.strip()) if not m: return None try: return float(m.group(0)) except ValueError: return None def _parse_int(value: str) -> Optional[int]: n = _parse_number(value) return None if n is None else int(round(n)) # Months exactly as BW writes them. _MONTHS = { "January": 1, "February": 2, "March": 3, "April": 4, "May": 5, "June": 6, "July": 7, "August": 8, "September": 9, "October": 10, "November": 11, "December": 12, # Short forms used in monitor-log rows ("Apr 23 /26"). "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12, } def _parse_event_date(s: str) -> Optional[datetime.date]: """Parse "April 23, 2026" or "May 8, 2026" → date.""" s = s.strip() parts = s.replace(",", " ").split() if len(parts) < 3: return None month_name, day_str, year_str = parts[0], parts[1], parts[2] month = _MONTHS.get(month_name) if month is None: return None try: return datetime.date(int(year_str), month, int(day_str)) except ValueError: return None def _parse_iso_date(s: str) -> Optional[datetime.date]: """Parse "2026-05-16" → date. Histograms use ISO format for their Start Date / Stop Date / Peak Date fields; waveforms use the "May 8, 2026" long form which `_parse_event_date` handles.""" s = s.strip() try: return datetime.date.fromisoformat(s) except ValueError: return None _INTERVAL_UNIT_SECONDS = { "second": 1, "seconds": 1, "sec": 1, "secs": 1, "minute": 60, "minutes": 60, "min": 60, "mins": 60, "hour": 3600, "hours": 3600, "hr": 3600, "hrs": 3600, } def _parse_interval_size(s: str) -> Optional[float]: """Parse "1 minute" / "5 minutes" / "15 seconds" / "2 seconds" → seconds. Handles the BW Compliance Setup → Histogram Interval values verbatim ("2 seconds", "5 seconds", "15 seconds", "1 minute", "5 minutes", "15 minutes") plus a few defensive variants. """ if not s: return None parts = s.strip().split() if len(parts) < 2: return None try: n = float(parts[0]) except ValueError: return None unit_per_s = _INTERVAL_UNIT_SECONDS.get(parts[1].lower()) if unit_per_s is None: return None return n * unit_per_s def _parse_event_time(s: str) -> Optional[datetime.time]: """Parse "15:56:35" → time.""" s = s.strip() try: h, m, sec = s.split(":") return datetime.time(int(h), int(m), int(sec)) except (ValueError, IndexError): return None def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]: """Parse "April 29, 2025 by Instantel" → (date, "Instantel").""" parts = value.split(" by ", 1) date = _parse_event_date(parts[0]) by = parts[1].strip() if len(parts) > 1 else None return date, by def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]: """Parse a tab-separated monitor log row. Format: `\t\t` where each timestamp is BW's short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16"). Year is encoded as a 2-digit suffix; we expand "/26" → 2026. """ parts = line.split("\t") if len(parts) < 2: return None start = _parse_monitor_ts(parts[0]) stop = _parse_monitor_ts(parts[1]) desc = parts[2].strip() if len(parts) > 2 else None if start is None and stop is None and not desc: return None return MonitorLogEntry(start_time=start, stop_time=stop, description=desc) def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]: """Parse "Apr 23 /26 15:46:16" → datetime.""" s = s.strip() parts = s.split() if len(parts) < 4: return None month = _MONTHS.get(parts[0]) if month is None: return None try: day = int(parts[1]) # parts[2] looks like "/26" → century-flip to 2026 yy = int(parts[2].lstrip("/")) year = 2000 + yy if yy < 80 else 1900 + yy h, m, sec = (int(x) for x in parts[3].split(":")) return datetime.datetime(year, month, day, h, m, sec) except (ValueError, IndexError): return None # ── User-notes positional slot map ────────────────────────────────────────── # # Blastware's Compliance Setup → Notes tab shows four operator-supplied # fields whose LABELS the operator can rename (see screenshot in # project archive). Defaults are "Project:" / "Client:" / # "User Name:" / "Seis Loc:", but an operator using a different # convention can rename them to anything ("Building:", "Site:", # "Address:", etc.). The ASCII export reflects whatever the operator # typed, so label-based matching is fragile. # # What IS reliable: BW always writes the 4 user-notes lines in the # same order, contiguously between the `Units :` line and the # `Geo Range :` line. We parse them by POSITION and preserve the # operator's labels in `report.user_note_labels` so terra-view can # render them as the operator intended. _USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location") # ───────────────────────────────────────────────────────────────────────────── # Top-level parser # ───────────────────────────────────────────────────────────────────────────── def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport: """Parse a BW per-event ASCII export into a structured BwAsciiReport. Set ``parse_samples=True`` to also populate ``report.samples`` with the trailing sample table. Default False because the table is huge and most callers only want metadata for indexing. """ if isinstance(text, bytes): text = text.decode("ascii", errors="replace") report = BwAsciiReport() # Pre-create channel stat slots so callers can rely on them existing. for ch in ("Tran", "Vert", "Long", "MicL"): report.channels.setdefault(ch, ChannelStats()) report.sensor_check.setdefault(ch, SensorCheck()) lines = text.splitlines() i = 0 n = len(lines) in_monitor_log_section = False event_time_str: Optional[str] = None event_date: Optional[datetime.date] = None # User-notes block detection. We enter the block after parsing # the "Units :" line and exit on the "Geo Range :" line. Inside, # the first 4 unmatched `