""" micromate/idf_to_bw_report.py — adapter that projects a parsed Thor IDF report (+ binary metadata + decoded IDFH intervals) into the ``bw_report``-shaped dict that :mod:`sfm.report_pdf.gather_report_data` consumes. Lets Thor events flow through the existing Series III Event Report PDF pipeline without duplicating the renderer. Thor's report content is ~95% the same data shape as BW's; the field names differ but the underlying metrics map 1:1. Caveats ─────── - **Mic units** — Thor records ``MicPSPL`` natively in dB(L). This adapter sets ``bw_report.mic.pspl_dbl`` directly; the report renderer recomputes the equivalent psi via its dBL→psi formula. - **Saturation / above-range flags** — Thor doesn't always mark ``OORANGE`` the way BW does; we set ``zc_freq_above_range`` only when a `>100` sentinel was preserved in the raw text. - **Per-interval data** — for IDFH events we build ``interval_times`` by stepping ``IntervalSize`` from ``HistogramStartTime``; the binary decoder confirms one record per step (882 / 881 / 881 ... across the corpus). - **calibration_by parsing** — Thor's free-form ``Calibration : November 22, 2023 by Instantel`` is split on ``" by "`` to extract the calibrator; the date prefix is parsed where possible, otherwise the binary-extracted ``calibration_date`` from :class:`micromate.idf_file.IdfBinaryMetadata` wins. """ from __future__ import annotations import datetime import re from typing import Any, Dict, List, Optional # ─── Helpers ──────────────────────────────────────────────────────────────── _NUM_RE = re.compile(r"-?\d+(?:\.\d+)?") def _parse_first_number(s: Optional[str]) -> Optional[float]: """Pull the first numeric token from a string like ``"0.1500 in/s"``.""" if s is None: return None m = _NUM_RE.search(str(s)) if not m: return None try: return float(m.group(0)) except ValueError: return None def _parse_interval_size_s(s: Optional[str]) -> Optional[float]: """``"60 sec"`` → 60.0, ``"5 min"`` → 300.0, ``"1 hour"`` → 3600.""" if s is None: return None num = _parse_first_number(s) if num is None: return None sl = str(s).lower() if "hour" in sl or "hr" in sl: return num * 3600.0 if "min" in sl: return num * 60.0 return num # default to seconds def _parse_calibration(text: Optional[str]) -> tuple[Optional[str], Optional[str]]: """Split ``"November 22, 2023 by Instantel"`` → (ISO date, calibrator). Returns ``(None, None)`` if neither half parses. """ if not text: return None, None parts = str(text).split(" by ", 1) date_part = parts[0].strip() if parts else None by_part = parts[1].strip() if len(parts) > 1 else None iso_date: Optional[str] = None if date_part: for fmt in ("%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%m/%d/%Y"): try: iso_date = datetime.datetime.strptime(date_part, fmt).date().isoformat() break except ValueError: continue return iso_date, by_part def _channel_peaks(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]: """Map ``tran_ppv`` / ``tran_zc_freq`` / ... → bw_report.peaks.tran shape.""" out: Dict[str, Any] = {} for src, dst in ( (f"{ch_lc}_ppv", "ppv_ips"), (f"{ch_lc}_zc_freq", "zc_freq_hz"), (f"{ch_lc}_time_of_peak", "time_of_peak_s"), (f"{ch_lc}_peak_acceleration", "peak_accel_g"), (f"{ch_lc}_peak_displacement", "peak_disp_in"), ): v = idf.get(src) if v is not None: out[dst] = v # ZC freq ">100" sentinel: the raw text carries it under the un-typed # key (e.g. ``raw["tran_zc_freq"]`` would be ``">100"``), and our parser # dropped the typed entry. Detect that case and flag. raw_zc = idf.get(f"{ch_lc}_zc_freq") if isinstance(raw_zc, str) and ">" in raw_zc: out["zc_freq_above_range"] = True out.pop("zc_freq_hz", None) return out def _sensor_check(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]: out: Dict[str, Any] = {} fr = idf.get(f"{ch_lc}_test_freq") if fr is not None: out["freq_hz"] = _parse_first_number(fr) rt = idf.get(f"{ch_lc}_test_ratio") if rt is not None: out["ratio"] = _parse_first_number(rt) am = idf.get(f"{ch_lc}_test_amplitude") if am is not None: out["amplitude_mv"] = _parse_first_number(am) res = idf.get(f"{ch_lc}_test_results") if res is not None: out["result"] = str(res).strip() return {k: v for k, v in out.items() if v is not None} def _interval_times(idf: Dict[str, Any], n_intervals: Optional[int]) -> List[str]: """Synthesise per-interval timestamps from start + interval_size × k. Returns ``[]`` when start time or interval size is unknown. """ if not n_intervals: return [] start_date = idf.get("histogram_start_date") or idf.get("event_date") start_time = idf.get("histogram_start_time") or idf.get("event_time") iv_str = idf.get("interval_size") iv_s = _parse_interval_size_s(iv_str) if not (start_date and start_time and iv_s): return [] try: t0 = datetime.datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M:%S") except ValueError: return [] out = [] for k in range(int(n_intervals)): t = t0 + datetime.timedelta(seconds=iv_s * (k + 1)) out.append(t.isoformat()) return out # ─── Top-level adapter ────────────────────────────────────────────────────── def build_bw_report_from_idf( idf_report: Dict[str, Any], *, binary_md=None, intervals: Optional[list] = None, is_histogram: Optional[bool] = None, ) -> Dict[str, Any]: """Project a parsed IDF report dict (and optional binary metadata + decoded IDFH intervals) into the BW report sidecar shape. The returned dict is structurally identical to what ``minimateplus.event_file_io._bw_report_to_dict`` produces from a real BW ASCII report — it can be assigned to ``sidecar["bw_report"]`` and consumed verbatim by ``sfm.report_pdf.gather_report_data``. ``intervals`` is the list of :class:`micromate.idf_file.IdfhInterval` objects from :func:`micromate.idf_file.decode_idfh_body`; only used for histogram events to derive accurate ``interval_times``. """ if is_histogram is None: et = str(idf_report.get("event_type", "")) is_histogram = et.lower().startswith("full histogram") # ── Trigger / recording / device ───────────────────────────────────── trigger_channel = idf_report.get("trigger") trigger_level = _parse_first_number(idf_report.get("geo_trigger_level")) geo_range_ips = _parse_first_number(idf_report.get("geo_range")) cal_iso, cal_by = _parse_calibration(idf_report.get("calibration")) # Prefer the binary-extracted calibration_date when our text parse fell # through; the binary date is unambiguous. if cal_iso is None and binary_md is not None and binary_md.calibration_date: cal_iso = binary_md.calibration_date.isoformat() # ── Histogram fields ──────────────────────────────────────────────── hist_block: Dict[str, Any] = { "start": None, "stop": None, "n_intervals": None, "interval_size": None, "interval_size_s": None, "channel_peak_when": {}, } if is_histogram: sd = idf_report.get("histogram_start_date") st = idf_report.get("histogram_start_time") if sd and st: try: hist_block["start"] = datetime.datetime.strptime( f"{sd} {st}", "%Y-%m-%d %H:%M:%S" ).isoformat() except ValueError: pass ed = idf_report.get("histogram_stop_date") et_ = idf_report.get("histogram_stop_time") if ed and et_: try: hist_block["stop"] = datetime.datetime.strptime( f"{ed} {et_}", "%Y-%m-%d %H:%M:%S" ).isoformat() except ValueError: pass n_raw = idf_report.get("number_of_intervals") if n_raw is not None: try: # Thor reports a float like "81.04"; round to int (the BW # report uses an int for the column). hist_block["n_intervals"] = int(float(str(n_raw))) except ValueError: pass # When the binary decoder gave us the actual interval count, prefer it. if intervals is not None: hist_block["n_intervals"] = len(intervals) hist_block["interval_size"] = idf_report.get("interval_size") hist_block["interval_size_s"] = _parse_interval_size_s(idf_report.get("interval_size")) # interval_times derived from start+step (the BW report uses the # exact strings; we match its representation). times = _interval_times(idf_report, hist_block["n_intervals"]) # Per-channel peak when (absolute date+time at which the channel's # peak occurred over the histogram run). Thor splits this into # ``TranPeakDate`` / ``TranPeakTime`` etc. peak_when: Dict[str, str] = {} for ch_label, ch_lc in (("Tran", "tran"), ("Vert", "vert"), ("Long", "long"), ("MicL", "mic")): d = idf_report.get(f"{ch_lc}_peak_date") t = idf_report.get(f"{ch_lc}_peak_time") if d and t: try: peak_when[ch_label] = datetime.datetime.strptime( f"{d} {t}", "%Y-%m-%d %H:%M:%S" ).isoformat() except ValueError: continue if peak_when: hist_block["channel_peak_when"] = peak_when # ── Mic block ──────────────────────────────────────────────────────── mic_block = { "weighting": "L", # Thor mic is ISEE Linear "pspl_dbl": idf_report.get("mic_ppv"), # the dB(L) float "pspl_saturated": False, "zc_freq_hz": idf_report.get("mic_zc_freq"), "zc_freq_above_range": isinstance(idf_report.get("mic_zc_freq"), str) and ">" in str(idf_report.get("mic_zc_freq")), "time_of_peak_s": idf_report.get("mic_time_of_peak"), } if mic_block["zc_freq_above_range"]: mic_block["zc_freq_hz"] = None # ── Peaks ──────────────────────────────────────────────────────────── vs_block = { "ips": idf_report.get("peak_vector_sum"), "time_s": _parse_first_number(idf_report.get("peak_vector_sum_time_sum")), "when": None, "saturated": False, } if is_histogram: # PVS absolute date+time, when present. vs_d = idf_report.get("peak_vector_sum_date") vs_t = idf_report.get("peak_vector_sum_time") if vs_d and vs_t: try: vs_block["when"] = datetime.datetime.strptime( f"{vs_d} {vs_t}", "%Y-%m-%d %H:%M:%S" ).isoformat() except ValueError: pass return { "available": True, "event_type": idf_report.get("event_type"), "version": idf_report.get("version"), "trigger": { "channel": trigger_channel, "geo_level_ips": trigger_level, }, "recording": { "sample_rate_sps": idf_report.get("sample_rate"), "record_time_s": idf_report.get("record_time_sec"), "pretrig_s": idf_report.get("pre_trigger_sec"), "stop_mode": idf_report.get("record_stop_mode"), "geo_range_ips": geo_range_ips, "units": idf_report.get("units"), }, "device": { "battery_volts": idf_report.get("battery_volts"), "calibration_date": cal_iso, "calibration_by": cal_by, }, "peaks": { "tran": _channel_peaks(idf_report, "tran"), "vert": _channel_peaks(idf_report, "vert"), "long": _channel_peaks(idf_report, "long"), "vector_sum": vs_block, }, "mic": mic_block, "sensor_check": { "tran": _sensor_check(idf_report, "tran"), "vert": _sensor_check(idf_report, "vert"), "long": _sensor_check(idf_report, "long"), "mic": _sensor_check(idf_report, "mic"), }, "histogram": hist_block, "monitor_log": [], "pc_sw_version": None, }