""" minimateplus/bw_ascii_report.py — parser for Blastware's per-event ASCII report (the .TXT file BW writes alongside each saved event binary). The ASCII export is the authoritative source for every "rich" per-event field that BW computes from the waveform but never persists in the BW binary itself: - Per-channel PPV (Tran / Vert / Long / MicL) - Peak Vector Sum + Peak Vector Sum Time - Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement - MicL PSPL, MicL Time of Peak, MicL ZC Freq - Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results) - MicL Test Amplitude (mV) - Battery, calibration date, monitor-log timestamps Persisting these values into the SFM database lets the monthly-summary review workflow ("show me events at Location X with PVS > 0.5") work without depending on the (still-undecoded) waveform body codec. Format (verified against decode-re/5-8-26 4-event bundle): - One field per line, wrapped in double quotes: `"Field Name : Value"` - Field/value separator: literal ` : ` (space-colon-space). - Some field names contain an internal `:` already (e.g. `"Project:"`), so we split on the FIRST ` : ` only. - Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`. - A `"Monitor Log(s)"` marker line is followed by tab-separated rows of `start_timestop_timedescription`. - Final `"PC SW Version : ..."` line ends the metadata block. - A blank line separates metadata from the sample table. - Sample table starts with ` Tran Vert ...`, then one row per sample (tab-separated, right-padded numeric values). - Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold). Because some metadata fields have whitespace quirks ("MicL Time of Peak" has two spaces; the leading "Project:" value has its own colon), we normalise whitespace in the key before lookup. """ from __future__ import annotations import datetime import re from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple, Union # ───────────────────────────────────────────────────────────────────────────── # Output dataclasses # ───────────────────────────────────────────────────────────────────────────── @dataclass class ChannelStats: """Per-channel derived stats, populated from an event report.""" ppv_ips: Optional[float] = None # in/s (geo channels only) zc_freq_hz: Optional[float] = None # Hz time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative) peak_accel_g: Optional[float] = None # g (geo channels only) peak_disp_in: Optional[float] = None # in (geo channels only) @dataclass class MicStats: """MicL-specific stats.""" weighting: Optional[str] = None # e.g. "Linear Weighting" pspl_dbl: Optional[float] = None # dB(L) zc_freq_hz: Optional[float] = None time_of_peak_s: Optional[float] = None @dataclass class SensorCheck: """Per-channel sensor self-check result. Geo channels report a frequency + ratio; MicL reports a frequency + amplitude (mV). All channels also have a Pass/Fail string. """ test_freq_hz: Optional[float] = None test_ratio: Optional[float] = None # geo channels only test_amplitude_mv: Optional[float] = None # MicL only test_results: Optional[str] = None # "Passed" / "Failed" @dataclass class MonitorLogEntry: """One row of the trailing Monitor Log(s) block.""" start_time: Optional[datetime.datetime] = None stop_time: Optional[datetime.datetime] = None description: Optional[str] = None @dataclass class BwAsciiReport: """Structured representation of one BW per-event ASCII export.""" # ── Identity ───────────────────────────────────────────────────────────── event_type: Optional[str] = None # e.g. "Full Waveform" serial: Optional[str] = None # e.g. "BE11529" version: Optional[str] = None # firmware version line file_name: Optional[str] = None # e.g. "M529LK44.AB0" event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date # ── Trigger / recording config ────────────────────────────────────────── trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit" geo_trigger_level_ips: Optional[float] = None pretrig_s: Optional[float] = None # negative seconds record_time_s: Optional[float] = None record_stop_mode: Optional[str] = None sample_rate_sps: Optional[int] = None battery_volts: Optional[float] = None calibration_date: Optional[datetime.date] = None calibration_by: Optional[str] = None # e.g. "Instantel" units: Optional[str] = None # e.g. "in/s and dB(L)" # ── Operator-supplied metadata ────────────────────────────────────────── project: Optional[str] = None client: Optional[str] = None operator: Optional[str] = None # User Name sensor_location: Optional[str] = None # Seis Loc # ── Geo channel scaling ───────────────────────────────────────────────── geo_range_ips: Optional[float] = None # 10.000 / 1.250 # ── Per-channel derived stats (geo + mic) ─────────────────────────────── channels: Dict[str, ChannelStats] = field(default_factory=dict) mic: MicStats = field(default_factory=MicStats) # ── Vector sum ────────────────────────────────────────────────────────── peak_vector_sum_ips: Optional[float] = None peak_vector_sum_time_s: Optional[float] = None # ── Sensor self-check (per channel) ───────────────────────────────────── sensor_check: Dict[str, SensorCheck] = field(default_factory=dict) # ── Monitor log + tooling version ─────────────────────────────────────── monitor_log: List[MonitorLogEntry] = field(default_factory=list) pc_sw_version: Optional[str] = None # ── Sample table (optional; only parsed if requested) ─────────────────── # Each entry: (Tran, Vert, Long, MicL) in the report's units (geo # channels in in/s, MicL in dB(L)). None when parse_samples=False. samples: Optional[List[Tuple[float, float, float, float]]] = None # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── _KEY_NORMALISE_RE = re.compile(r"\s+") _NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?") def _normalise_key(k: str) -> str: """Collapse whitespace runs (incl. tabs) and strip — handles BW's "MicL Time of Peak" double-space and leading-colon quirks.""" return _KEY_NORMALISE_RE.sub(" ", k).strip() def _strip_quotes(line: str) -> str: line = line.rstrip("\r\n") if len(line) >= 2 and line.startswith('"') and line.endswith('"'): return line[1:-1] return line def _parse_number(value: str) -> Optional[float]: """Pull the leading numeric portion out of a value like "0.500 in/s".""" m = _NUMERIC_RE.match(value.strip()) if not m: return None try: return float(m.group(0)) except ValueError: return None def _parse_int(value: str) -> Optional[int]: n = _parse_number(value) return None if n is None else int(round(n)) # Months exactly as BW writes them. _MONTHS = { "January": 1, "February": 2, "March": 3, "April": 4, "May": 5, "June": 6, "July": 7, "August": 8, "September": 9, "October": 10, "November": 11, "December": 12, # Short forms used in monitor-log rows ("Apr 23 /26"). "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12, } def _parse_event_date(s: str) -> Optional[datetime.date]: """Parse "April 23, 2026" or "May 8, 2026" → date.""" s = s.strip() parts = s.replace(",", " ").split() if len(parts) < 3: return None month_name, day_str, year_str = parts[0], parts[1], parts[2] month = _MONTHS.get(month_name) if month is None: return None try: return datetime.date(int(year_str), month, int(day_str)) except ValueError: return None def _parse_event_time(s: str) -> Optional[datetime.time]: """Parse "15:56:35" → time.""" s = s.strip() try: h, m, sec = s.split(":") return datetime.time(int(h), int(m), int(sec)) except (ValueError, IndexError): return None def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]: """Parse "April 29, 2025 by Instantel" → (date, "Instantel").""" parts = value.split(" by ", 1) date = _parse_event_date(parts[0]) by = parts[1].strip() if len(parts) > 1 else None return date, by def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]: """Parse a tab-separated monitor log row. Format: `\t\t` where each timestamp is BW's short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16"). Year is encoded as a 2-digit suffix; we expand "/26" → 2026. """ parts = line.split("\t") if len(parts) < 2: return None start = _parse_monitor_ts(parts[0]) stop = _parse_monitor_ts(parts[1]) desc = parts[2].strip() if len(parts) > 2 else None if start is None and stop is None and not desc: return None return MonitorLogEntry(start_time=start, stop_time=stop, description=desc) def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]: """Parse "Apr 23 /26 15:46:16" → datetime.""" s = s.strip() parts = s.split() if len(parts) < 4: return None month = _MONTHS.get(parts[0]) if month is None: return None try: day = int(parts[1]) # parts[2] looks like "/26" → century-flip to 2026 yy = int(parts[2].lstrip("/")) year = 2000 + yy if yy < 80 else 1900 + yy h, m, sec = (int(x) for x in parts[3].split(":")) return datetime.datetime(year, month, day, h, m, sec) except (ValueError, IndexError): return None # ───────────────────────────────────────────────────────────────────────────── # Top-level parser # ───────────────────────────────────────────────────────────────────────────── def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport: """Parse a BW per-event ASCII export into a structured BwAsciiReport. Set ``parse_samples=True`` to also populate ``report.samples`` with the trailing sample table. Default False because the table is huge and most callers only want metadata for indexing. """ if isinstance(text, bytes): text = text.decode("ascii", errors="replace") report = BwAsciiReport() # Pre-create channel stat slots so callers can rely on them existing. for ch in ("Tran", "Vert", "Long", "MicL"): report.channels.setdefault(ch, ChannelStats()) report.sensor_check.setdefault(ch, SensorCheck()) lines = text.splitlines() i = 0 n = len(lines) in_monitor_log_section = False event_time_str: Optional[str] = None event_date: Optional[datetime.date] = None while i < n: raw_line = lines[i] i += 1 # Blank line marks the start of the sample table. if raw_line.strip() == "": break line = _strip_quotes(raw_line) # Monitor log section: "Monitor Log(s)" header followed by N rows # (still inside double-quoted lines), terminated by a non-row line # like "PC SW Version : ..." or a blank line. if not in_monitor_log_section and line.strip() == "Monitor Log(s)": in_monitor_log_section = True continue if in_monitor_log_section: # Heuristic: monitor rows contain a tab; the next "Field : Value" # line ends the section. if "\t" in line: entry = _parse_monitor_row(line) if entry: report.monitor_log.append(entry) continue # Falls through to the field parser below; clear the flag. in_monitor_log_section = False # "Field : Value" — split on FIRST occurrence of " : " idx = line.find(" : ") if idx < 0: continue key = _normalise_key(line[:idx]) value = line[idx + 3 :].strip() # ── Identity / config ──────────────────────────────────────────────── if key == "Event Type": report.event_type = value elif key == "Serial Number": report.serial = value elif key == "Version": report.version = value elif key == "File Name": report.file_name = value elif key == "Event Time": event_time_str = value elif key == "Event Date": event_date = _parse_event_date(value) elif key == "Trigger": report.trigger_channel = value elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value) elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value) elif key == "Record Time": report.record_time_s = _parse_number(value) elif key == "Record Stop Mode": report.record_stop_mode = value elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value) elif key == "Battery Level": report.battery_volts = _parse_number(value) elif key == "Calibration": report.calibration_date, report.calibration_by = _parse_calibration(value) elif key == "Units": report.units = value # Project labels in BW carry their own trailing colon — after # _normalise_key we just strip it for matching. elif key.rstrip(":") == "Project": report.project = value elif key.rstrip(":") == "Client": report.client = value elif key.rstrip(":") == "User Name":report.operator = value elif key.rstrip(":") == "Seis Loc": report.sensor_location = value elif key == "Geo Range": report.geo_range_ips = _parse_number(value) # ── Per-channel stats ──────────────────────────────────────────────── # All match the pattern "{Channel} " elif key in ( "Tran PPV", "Vert PPV", "Long PPV", "Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq", "Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak", "Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration", "Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement", ): ch_name, stat = key.split(" ", 1) cs = report.channels.setdefault(ch_name, ChannelStats()) num = _parse_number(value) if stat == "PPV": cs.ppv_ips = num elif stat == "ZC Freq": cs.zc_freq_hz = num elif stat == "Time of Peak": cs.time_of_peak_s = num elif stat == "Peak Acceleration": cs.peak_accel_g = num elif stat == "Peak Displacement": cs.peak_disp_in = num # ── Vector Sum ─────────────────────────────────────────────────────── elif key == "Peak Vector Sum": report.peak_vector_sum_ips = _parse_number(value) elif key == "Peak Vector Sum Time": report.peak_vector_sum_time_s = _parse_number(value) # ── Microphone block ──────────────────────────────────────────────── elif key == "Microphone": report.mic.weighting = value elif key == "MicL PSPL": report.mic.pspl_dbl = _parse_number(value) # Mirror onto the "MicL" entry in channels so callers querying # `channels["MicL"].ppv_ips` see something — but it's dB(L), not # in/s, so we store as-is in the MicStats and mark the channel. elif key == "MicL Time of Peak": report.mic.time_of_peak_s = _parse_number(value) cs = report.channels.setdefault("MicL", ChannelStats()) cs.time_of_peak_s = report.mic.time_of_peak_s elif key == "MicL ZC Freq": report.mic.zc_freq_hz = _parse_number(value) cs = report.channels.setdefault("MicL", ChannelStats()) cs.zc_freq_hz = report.mic.zc_freq_hz # ── Sensor self-check ──────────────────────────────────────────────── elif key in ( "Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq", "Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio", "MicL Test Amplitude", "Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results", ): ch_name, stat = key.split(" ", 1) sc = report.sensor_check.setdefault(ch_name, SensorCheck()) if stat == "Test Freq": sc.test_freq_hz = _parse_number(value) elif stat == "Test Ratio": sc.test_ratio = _parse_number(value) elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value) elif stat == "Test Results": sc.test_results = value # ── Trailer ───────────────────────────────────────────────────────── elif key == "PC SW Version": report.pc_sw_version = value # Unknown keys are silently dropped — forward-compat for future # BW versions that may add fields. # Combine event date + time into a datetime if event_date is not None and event_time_str is not None: t = _parse_event_time(event_time_str) if t is not None: report.event_datetime = datetime.datetime.combine(event_date, t) if parse_samples: report.samples = _parse_sample_table(lines, i) return report def _parse_sample_table( lines: List[str], start: int, ) -> List[Tuple[float, float, float, float]]: """Parse the trailing sample table. The table starts with a header row (" Tran ...") and continues until EOF. Each data row is a tab-separated quartet of numeric values. """ samples: List[Tuple[float, float, float, float]] = [] seen_header = False for line in lines[start:]: line = line.rstrip("\r\n") if not line.strip(): continue cols = [c.strip() for c in line.split("\t") if c.strip()] if not seen_header: # Header row contains channel names; numeric rows don't. if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols): seen_header = True continue if len(cols) < 4: continue try: samples.append(( float(cols[0]), float(cols[1]), float(cols[2]), float(cols[3]), )) except ValueError: continue return samples def parse_report_file( path: Union[str, Path], *, parse_samples: bool = False, ) -> BwAsciiReport: """Convenience: read a .TXT file from disk and parse it.""" return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)