feat: add thor report generation, pdf generation.

This commit is contained in:
2026-05-29 19:03:06 +00:00
parent 9b71ead44b
commit 9fd52ddabb
8 changed files with 601 additions and 2 deletions
+17 -2
View File
@@ -210,8 +210,7 @@ def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]:
"long_peak_acceleration",
"tran_peak_displacement", "vert_peak_displacement",
"long_peak_displacement",
"tran_time_of_peak", "vert_time_of_peak", "long_time_of_peak",
"mic_time_of_peak", "mic_zc_freq",
"mic_zc_freq",
)
for key in float_fields:
v = raw.get(key)
@@ -223,6 +222,22 @@ def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]:
else:
out.pop(key, None)
# Time-of-peak: Thor labels these "TimeofPeak" (lowercase "of") so the
# normalizer produces "*_timeof_peak". Map them to the canonical
# ``*_time_of_peak`` output keys for downstream consumers.
for raw_key, out_key in (
("tran_timeof_peak", "tran_time_of_peak"),
("vert_timeof_peak", "vert_time_of_peak"),
("long_timeof_peak", "long_time_of_peak"),
("mic_timeof_peak", "mic_time_of_peak"),
):
v = raw.get(raw_key)
if v is None:
continue
fv = _parse_float(v)
if fv is not None:
out[out_key] = fv
# Microphone — Thor reports MicPSPL (dB(L)) which is the closest
# analogue to BW's mic_ppv. The raw "99.4 dB(L)" string stays in
# `out` under the original `mic_pspl` key for display; the parsed
+323
View File
@@ -0,0 +1,323 @@
"""
micromate/idf_to_bw_report.py — adapter that projects a parsed Thor IDF
report (+ binary metadata + decoded IDFH intervals) into the
``bw_report``-shaped dict that :mod:`sfm.report_pdf.gather_report_data`
consumes.
Lets Thor events flow through the existing Series III Event Report PDF
pipeline without duplicating the renderer. Thor's report content is
~95% the same data shape as BW's; the field names differ but the
underlying metrics map 1:1.
Caveats
───────
- **Mic units** — Thor records ``MicPSPL`` natively in dB(L). This
adapter sets ``bw_report.mic.pspl_dbl`` directly; the report
renderer recomputes the equivalent psi via its dBL→psi formula.
- **Saturation / above-range flags** — Thor doesn't always mark
``OORANGE`` the way BW does; we set ``zc_freq_above_range`` only
when a `>100` sentinel was preserved in the raw text.
- **Per-interval data** — for IDFH events we build ``interval_times``
by stepping ``IntervalSize`` from ``HistogramStartTime``; the binary
decoder confirms one record per step (882 / 881 / 881 ... across
the corpus).
- **calibration_by parsing** — Thor's free-form ``Calibration : November
22, 2023 by Instantel`` is split on ``" by "`` to extract the
calibrator; the date prefix is parsed where possible, otherwise
the binary-extracted ``calibration_date`` from
:class:`micromate.idf_file.IdfBinaryMetadata` wins.
"""
from __future__ import annotations
import datetime
import re
from typing import Any, Dict, List, Optional
# ─── Helpers ────────────────────────────────────────────────────────────────
_NUM_RE = re.compile(r"-?\d+(?:\.\d+)?")
def _parse_first_number(s: Optional[str]) -> Optional[float]:
"""Pull the first numeric token from a string like ``"0.1500 in/s"``."""
if s is None:
return None
m = _NUM_RE.search(str(s))
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
def _parse_interval_size_s(s: Optional[str]) -> Optional[float]:
"""``"60 sec"`` → 60.0, ``"5 min"`` → 300.0, ``"1 hour"`` → 3600."""
if s is None:
return None
num = _parse_first_number(s)
if num is None:
return None
sl = str(s).lower()
if "hour" in sl or "hr" in sl:
return num * 3600.0
if "min" in sl:
return num * 60.0
return num # default to seconds
def _parse_calibration(text: Optional[str]) -> tuple[Optional[str], Optional[str]]:
"""Split ``"November 22, 2023 by Instantel"`` → (ISO date, calibrator).
Returns ``(None, None)`` if neither half parses.
"""
if not text:
return None, None
parts = str(text).split(" by ", 1)
date_part = parts[0].strip() if parts else None
by_part = parts[1].strip() if len(parts) > 1 else None
iso_date: Optional[str] = None
if date_part:
for fmt in ("%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%m/%d/%Y"):
try:
iso_date = datetime.datetime.strptime(date_part, fmt).date().isoformat()
break
except ValueError:
continue
return iso_date, by_part
def _channel_peaks(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]:
"""Map ``tran_ppv`` / ``tran_zc_freq`` / ... → bw_report.peaks.tran shape."""
out: Dict[str, Any] = {}
for src, dst in (
(f"{ch_lc}_ppv", "ppv_ips"),
(f"{ch_lc}_zc_freq", "zc_freq_hz"),
(f"{ch_lc}_time_of_peak", "time_of_peak_s"),
(f"{ch_lc}_peak_acceleration", "peak_accel_g"),
(f"{ch_lc}_peak_displacement", "peak_disp_in"),
):
v = idf.get(src)
if v is not None:
out[dst] = v
# ZC freq ">100" sentinel: the raw text carries it under the un-typed
# key (e.g. ``raw["tran_zc_freq"]`` would be ``">100"``), and our parser
# dropped the typed entry. Detect that case and flag.
raw_zc = idf.get(f"{ch_lc}_zc_freq")
if isinstance(raw_zc, str) and ">" in raw_zc:
out["zc_freq_above_range"] = True
out.pop("zc_freq_hz", None)
return out
def _sensor_check(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]:
out: Dict[str, Any] = {}
fr = idf.get(f"{ch_lc}_test_freq")
if fr is not None:
out["freq_hz"] = _parse_first_number(fr)
rt = idf.get(f"{ch_lc}_test_ratio")
if rt is not None:
out["ratio"] = _parse_first_number(rt)
am = idf.get(f"{ch_lc}_test_amplitude")
if am is not None:
out["amplitude_mv"] = _parse_first_number(am)
res = idf.get(f"{ch_lc}_test_results")
if res is not None:
out["result"] = str(res).strip()
return {k: v for k, v in out.items() if v is not None}
def _interval_times(idf: Dict[str, Any], n_intervals: Optional[int]) -> List[str]:
"""Synthesise per-interval timestamps from start + interval_size × k.
Returns ``[]`` when start time or interval size is unknown.
"""
if not n_intervals:
return []
start_date = idf.get("histogram_start_date") or idf.get("event_date")
start_time = idf.get("histogram_start_time") or idf.get("event_time")
iv_str = idf.get("interval_size")
iv_s = _parse_interval_size_s(iv_str)
if not (start_date and start_time and iv_s):
return []
try:
t0 = datetime.datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M:%S")
except ValueError:
return []
out = []
for k in range(int(n_intervals)):
t = t0 + datetime.timedelta(seconds=iv_s * (k + 1))
out.append(t.isoformat())
return out
# ─── Top-level adapter ──────────────────────────────────────────────────────
def build_bw_report_from_idf(
idf_report: Dict[str, Any],
*,
binary_md=None,
intervals: Optional[list] = None,
is_histogram: Optional[bool] = None,
) -> Dict[str, Any]:
"""Project a parsed IDF report dict (and optional binary metadata +
decoded IDFH intervals) into the BW report sidecar shape.
The returned dict is structurally identical to what
``minimateplus.event_file_io._bw_report_to_dict`` produces from a
real BW ASCII report — it can be assigned to
``sidecar["bw_report"]`` and consumed verbatim by
``sfm.report_pdf.gather_report_data``.
``intervals`` is the list of :class:`micromate.idf_file.IdfhInterval`
objects from :func:`micromate.idf_file.decode_idfh_body`; only used
for histogram events to derive accurate ``interval_times``.
"""
if is_histogram is None:
et = str(idf_report.get("event_type", ""))
is_histogram = et.lower().startswith("full histogram")
# ── Trigger / recording / device ─────────────────────────────────────
trigger_channel = idf_report.get("trigger")
trigger_level = _parse_first_number(idf_report.get("geo_trigger_level"))
geo_range_ips = _parse_first_number(idf_report.get("geo_range"))
cal_iso, cal_by = _parse_calibration(idf_report.get("calibration"))
# Prefer the binary-extracted calibration_date when our text parse fell
# through; the binary date is unambiguous.
if cal_iso is None and binary_md is not None and binary_md.calibration_date:
cal_iso = binary_md.calibration_date.isoformat()
# ── Histogram fields ────────────────────────────────────────────────
hist_block: Dict[str, Any] = {
"start": None, "stop": None, "n_intervals": None,
"interval_size": None, "interval_size_s": None,
"channel_peak_when": {},
}
if is_histogram:
sd = idf_report.get("histogram_start_date")
st = idf_report.get("histogram_start_time")
if sd and st:
try:
hist_block["start"] = datetime.datetime.strptime(
f"{sd} {st}", "%Y-%m-%d %H:%M:%S"
).isoformat()
except ValueError:
pass
ed = idf_report.get("histogram_stop_date")
et_ = idf_report.get("histogram_stop_time")
if ed and et_:
try:
hist_block["stop"] = datetime.datetime.strptime(
f"{ed} {et_}", "%Y-%m-%d %H:%M:%S"
).isoformat()
except ValueError:
pass
n_raw = idf_report.get("number_of_intervals")
if n_raw is not None:
try:
# Thor reports a float like "81.04"; round to int (the BW
# report uses an int for the column).
hist_block["n_intervals"] = int(float(str(n_raw)))
except ValueError:
pass
# When the binary decoder gave us the actual interval count, prefer it.
if intervals is not None:
hist_block["n_intervals"] = len(intervals)
hist_block["interval_size"] = idf_report.get("interval_size")
hist_block["interval_size_s"] = _parse_interval_size_s(idf_report.get("interval_size"))
# interval_times derived from start+step (the BW report uses the
# exact strings; we match its representation).
times = _interval_times(idf_report, hist_block["n_intervals"])
# Per-channel peak when (absolute date+time at which the channel's
# peak occurred over the histogram run). Thor splits this into
# ``TranPeakDate`` / ``TranPeakTime`` etc.
peak_when: Dict[str, str] = {}
for ch_label, ch_lc in (("Tran", "tran"), ("Vert", "vert"), ("Long", "long"), ("MicL", "mic")):
d = idf_report.get(f"{ch_lc}_peak_date")
t = idf_report.get(f"{ch_lc}_peak_time")
if d and t:
try:
peak_when[ch_label] = datetime.datetime.strptime(
f"{d} {t}", "%Y-%m-%d %H:%M:%S"
).isoformat()
except ValueError:
continue
if peak_when:
hist_block["channel_peak_when"] = peak_when
# ── Mic block ────────────────────────────────────────────────────────
mic_block = {
"weighting": "L", # Thor mic is ISEE Linear
"pspl_dbl": idf_report.get("mic_ppv"), # the dB(L) float
"pspl_saturated": False,
"zc_freq_hz": idf_report.get("mic_zc_freq"),
"zc_freq_above_range": isinstance(idf_report.get("mic_zc_freq"), str)
and ">" in str(idf_report.get("mic_zc_freq")),
"time_of_peak_s": idf_report.get("mic_time_of_peak"),
}
if mic_block["zc_freq_above_range"]:
mic_block["zc_freq_hz"] = None
# ── Peaks ────────────────────────────────────────────────────────────
vs_block = {
"ips": idf_report.get("peak_vector_sum"),
"time_s": _parse_first_number(idf_report.get("peak_vector_sum_time_sum")),
"when": None,
"saturated": False,
}
if is_histogram:
# PVS absolute date+time, when present.
vs_d = idf_report.get("peak_vector_sum_date")
vs_t = idf_report.get("peak_vector_sum_time")
if vs_d and vs_t:
try:
vs_block["when"] = datetime.datetime.strptime(
f"{vs_d} {vs_t}", "%Y-%m-%d %H:%M:%S"
).isoformat()
except ValueError:
pass
return {
"available": True,
"event_type": idf_report.get("event_type"),
"version": idf_report.get("version"),
"trigger": {
"channel": trigger_channel,
"geo_level_ips": trigger_level,
},
"recording": {
"sample_rate_sps": idf_report.get("sample_rate"),
"record_time_s": idf_report.get("record_time_sec"),
"pretrig_s": idf_report.get("pre_trigger_sec"),
"stop_mode": idf_report.get("record_stop_mode"),
"geo_range_ips": geo_range_ips,
"units": idf_report.get("units"),
},
"device": {
"battery_volts": idf_report.get("battery_volts"),
"calibration_date": cal_iso,
"calibration_by": cal_by,
},
"peaks": {
"tran": _channel_peaks(idf_report, "tran"),
"vert": _channel_peaks(idf_report, "vert"),
"long": _channel_peaks(idf_report, "long"),
"vector_sum": vs_block,
},
"mic": mic_block,
"sensor_check": {
"tran": _sensor_check(idf_report, "tran"),
"vert": _sensor_check(idf_report, "vert"),
"long": _sensor_check(idf_report, "long"),
"mic": _sensor_check(idf_report, "mic"),
},
"histogram": hist_block,
"monitor_log": [],
"pc_sw_version": None,
}