feat: add thor report generation, pdf generation.
This commit is contained in:
@@ -0,0 +1,102 @@
|
||||
"""End-to-end Thor report PDF rendering.
|
||||
|
||||
Ingests an IDFW + .txt via save_imported_idf, runs gather_report_data
|
||||
(faking a minimal DB row), and renders the PDF to disk.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
import tempfile
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
REPO = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(REPO))
|
||||
|
||||
from sfm.waveform_store import WaveformStore
|
||||
from sfm import report_pdf
|
||||
|
||||
|
||||
class FakeDb:
|
||||
"""Stand-in for SeismoDb.get_event(); the renderer only needs a few cols."""
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
|
||||
def get_event(self, _id):
|
||||
return self.event
|
||||
|
||||
|
||||
def main():
|
||||
base = REPO / "tests/fixtures/THORDATA_example/THORDATA_example/UPMC Presby/UM11719"
|
||||
idfw = base / "UM11719_20231219162723.IDFW"
|
||||
txt = base / "TXT" / f"{idfw.name}.txt"
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
store = WaveformStore(Path(td))
|
||||
ev, rec = store.save_imported_idf(
|
||||
idfw.read_bytes(),
|
||||
idfw,
|
||||
idf_report_text=txt.read_text(errors="replace"),
|
||||
)
|
||||
print(f"save_imported_idf: h5={rec['hdf5_filename']}, sidecar={rec['sidecar_filename']}")
|
||||
|
||||
# Verify sidecar has bw_report block
|
||||
sc_path = Path(td) / "UM11719" / f"{idfw.name}.sfm.json"
|
||||
sc = json.loads(sc_path.read_text())
|
||||
bw = sc.get("bw_report", {})
|
||||
print(f" bw_report.available: {bw.get('available')}")
|
||||
print(f" bw_report.peaks.tran.ppv_ips: {bw.get('peaks', {}).get('tran', {}).get('ppv_ips')}")
|
||||
print(f" bw_report.mic.pspl_dbl: {bw.get('mic', {}).get('pspl_dbl')}")
|
||||
print(f" bw_report.histogram.n_intervals: {bw.get('histogram', {}).get('n_intervals')}")
|
||||
|
||||
# Build a DB-row-shaped dict from the Event for gather_report_data
|
||||
import datetime
|
||||
ts = ev.timestamp
|
||||
ts_iso = None
|
||||
if ts is not None:
|
||||
try:
|
||||
ts_iso = datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second).isoformat()
|
||||
except Exception:
|
||||
pass
|
||||
fake_row = {
|
||||
"serial": "UM11719",
|
||||
"blastware_filename": rec["filename"],
|
||||
"record_type": "Waveform",
|
||||
"timestamp": ts_iso,
|
||||
"sample_rate": ev.sample_rate,
|
||||
"project": ev.project_info.project if ev.project_info else None,
|
||||
"client": ev.project_info.client if ev.project_info else None,
|
||||
"operator": ev.project_info.operator if ev.project_info else None,
|
||||
"sensor_location": ev.project_info.sensor_location if ev.project_info else None,
|
||||
"created_at": None,
|
||||
}
|
||||
|
||||
rd = report_pdf.gather_report_data(FakeDb(fake_row), store, event_id="test-1")
|
||||
print()
|
||||
print(f"=== ReportData ===")
|
||||
print(f" event_id: {rd.event_id}")
|
||||
print(f" serial: {rd.serial}")
|
||||
print(f" record_type: {rd.record_type}")
|
||||
print(f" event_datetime: {rd.event_datetime_str}")
|
||||
print(f" trigger: {rd.trigger_source}")
|
||||
print(f" geo_range: {rd.geo_range_str}")
|
||||
print(f" sample_rate: {rd.sample_rate_str}")
|
||||
print(f" firmware: {rd.firmware}")
|
||||
print(f" calibration: {rd.calibration_date} by {rd.calibration_by}")
|
||||
print(f" battery: {rd.battery_volts}")
|
||||
print(f" PVS: {rd.peak_vector_sum_ips} in/s at {rd.peak_vector_sum_time_s} sec")
|
||||
print(f" mic_pspl_dbl: {rd.mic_pspl_dbl}")
|
||||
print(f" mic_zc_freq_hz: {rd.mic_zc_freq_hz}")
|
||||
print(f" channel_stats: {len(rd.channel_stats)} rows")
|
||||
for cs in rd.channel_stats:
|
||||
print(f" {cs['name']}: PPV={cs['ppv_ips']} ZC={cs['zc_freq_hz']} ToP={cs['time_of_peak_s']} Acc={cs['peak_accel_g']} Disp={cs['peak_disp_in']} Test={cs['sensor_check']}")
|
||||
|
||||
# Render the PDF
|
||||
out_path = REPO / "analysis_idf" / "thor_report.pdf"
|
||||
pdf_bytes = report_pdf.render_event_report_pdf(rd)
|
||||
out_path.write_bytes(pdf_bytes)
|
||||
print()
|
||||
print(f" PDF written: {out_path} ({len(pdf_bytes)} bytes)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,91 @@
|
||||
"""End-to-end Thor IDFH histogram report PDF rendering."""
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
import tempfile
|
||||
import json
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
REPO = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(REPO))
|
||||
|
||||
from sfm.waveform_store import WaveformStore
|
||||
from sfm import report_pdf
|
||||
|
||||
|
||||
class FakeDb:
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
|
||||
def get_event(self, _id):
|
||||
return self.event
|
||||
|
||||
|
||||
def main():
|
||||
# Use the multi-interval IDFH (81 + trigger row)
|
||||
idfh = REPO / "tests/fixtures/THORDATA_example/THORDATA_example/UPMC Presby/UM13981/UM13981_20220805075441.IDFH"
|
||||
txt = idfh.parent / "TXT" / f"{idfh.name}.txt"
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
store = WaveformStore(Path(td))
|
||||
ev, rec = store.save_imported_idf(
|
||||
idfh.read_bytes(),
|
||||
idfh,
|
||||
idf_report_text=txt.read_text(errors="replace"),
|
||||
)
|
||||
print(f"save_imported_idf: h5={rec['hdf5_filename']}, sidecar={rec['sidecar_filename']}")
|
||||
|
||||
sc_path = Path(td) / "UM13981" / f"{idfh.name}.sfm.json"
|
||||
sc = json.loads(sc_path.read_text())
|
||||
bw = sc.get("bw_report", {})
|
||||
hist = bw.get("histogram", {})
|
||||
print(f" bw_report.histogram.start: {hist.get('start')}")
|
||||
print(f" bw_report.histogram.stop: {hist.get('stop')}")
|
||||
print(f" bw_report.histogram.n_intervals: {hist.get('n_intervals')}")
|
||||
print(f" bw_report.histogram.interval_size: {hist.get('interval_size')}")
|
||||
print(f" bw_report.histogram.interval_size_s: {hist.get('interval_size_s')}")
|
||||
print(f" bw_report.peaks.tran.ppv_ips: {bw.get('peaks', {}).get('tran', {}).get('ppv_ips')}")
|
||||
|
||||
ts = ev.timestamp
|
||||
ts_iso = None
|
||||
if ts is not None:
|
||||
try:
|
||||
ts_iso = datetime.datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second).isoformat()
|
||||
except Exception:
|
||||
pass
|
||||
fake_row = {
|
||||
"serial": "UM13981",
|
||||
"blastware_filename": rec["filename"],
|
||||
"record_type": "Histogram",
|
||||
"timestamp": ts_iso,
|
||||
"sample_rate": ev.sample_rate,
|
||||
"project": ev.project_info.project if ev.project_info else None,
|
||||
"client": ev.project_info.client if ev.project_info else None,
|
||||
"operator": ev.project_info.operator if ev.project_info else None,
|
||||
"sensor_location": ev.project_info.sensor_location if ev.project_info else None,
|
||||
"created_at": None,
|
||||
}
|
||||
rd = report_pdf.gather_report_data(FakeDb(fake_row), store, event_id="hist-1")
|
||||
|
||||
print()
|
||||
print("=== ReportData (histogram) ===")
|
||||
print(f" is_histogram: {rd.is_histogram}")
|
||||
print(f" histogram_start: {rd.histogram_start_str}")
|
||||
print(f" histogram_stop: {rd.histogram_stop_str}")
|
||||
print(f" histogram_n_intervals: {rd.histogram_n_intervals}")
|
||||
print(f" histogram_interval_size:{rd.histogram_interval_size}")
|
||||
print(f" histogram_interval_times[:3]: {rd.histogram_interval_times[:3]}")
|
||||
print(f" histogram_interval_times[-2:]: {rd.histogram_interval_times[-2:]}")
|
||||
print(f" channel_stats: {len(rd.channel_stats)} rows")
|
||||
for cs in rd.channel_stats:
|
||||
print(f" {cs['name']}: PPV={cs['ppv_ips']} ZC={cs['zc_freq_hz']} peak_date={cs['peak_date']} peak_time={cs['peak_time']}")
|
||||
|
||||
pdf_bytes = report_pdf.render_event_report_pdf(rd)
|
||||
out_path = REPO / "analysis_idf" / "thor_report_idfh.pdf"
|
||||
out_path.write_bytes(pdf_bytes)
|
||||
print()
|
||||
print(f" PDF written: {out_path} ({len(pdf_bytes)} bytes)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,47 @@
|
||||
"""Verify build_bw_report_from_idf against a known sidecar."""
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(REPO))
|
||||
|
||||
from micromate.idf_ascii_report import parse_idf_report
|
||||
from micromate.idf_to_bw_report import build_bw_report_from_idf
|
||||
from micromate.idf_file import read_idf_file
|
||||
|
||||
|
||||
def show(prefix: str, d: dict, indent: int = 0):
|
||||
for k, v in d.items():
|
||||
if isinstance(v, dict):
|
||||
print(f"{' '*indent}{prefix}{k}:")
|
||||
show("", v, indent + 1)
|
||||
else:
|
||||
print(f"{' '*indent}{prefix}{k}: {v!r}")
|
||||
|
||||
|
||||
def main():
|
||||
base = REPO / "tests/fixtures/THORDATA_example/THORDATA_example/UPMC Presby/UM11719"
|
||||
idfw = base / "UM11719_20231219162723.IDFW"
|
||||
txt = base / "TXT" / f"{idfw.name}.txt"
|
||||
|
||||
report_dict = parse_idf_report(txt.read_text(errors="replace"))
|
||||
res = read_idf_file(idfw)
|
||||
bw = build_bw_report_from_idf(report_dict, binary_md=res.binary_metadata)
|
||||
|
||||
print("=== IDFW → bw_report ===")
|
||||
show("", bw)
|
||||
|
||||
print()
|
||||
print("=== IDFH (single trigger row) ===")
|
||||
idfh = base / "UM11719_20231219162648.IDFH"
|
||||
txt_h = base / "TXT" / f"{idfh.name}.txt"
|
||||
rh = parse_idf_report(txt_h.read_text(errors="replace"))
|
||||
res_h = read_idf_file(idfh)
|
||||
bw_h = build_bw_report_from_idf(rh, binary_md=res_h.binary_metadata, intervals=res_h.intervals)
|
||||
show("", bw_h)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
Binary file not shown.
@@ -210,8 +210,7 @@ def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]:
|
||||
"long_peak_acceleration",
|
||||
"tran_peak_displacement", "vert_peak_displacement",
|
||||
"long_peak_displacement",
|
||||
"tran_time_of_peak", "vert_time_of_peak", "long_time_of_peak",
|
||||
"mic_time_of_peak", "mic_zc_freq",
|
||||
"mic_zc_freq",
|
||||
)
|
||||
for key in float_fields:
|
||||
v = raw.get(key)
|
||||
@@ -223,6 +222,22 @@ def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]:
|
||||
else:
|
||||
out.pop(key, None)
|
||||
|
||||
# Time-of-peak: Thor labels these "TimeofPeak" (lowercase "of") so the
|
||||
# normalizer produces "*_timeof_peak". Map them to the canonical
|
||||
# ``*_time_of_peak`` output keys for downstream consumers.
|
||||
for raw_key, out_key in (
|
||||
("tran_timeof_peak", "tran_time_of_peak"),
|
||||
("vert_timeof_peak", "vert_time_of_peak"),
|
||||
("long_timeof_peak", "long_time_of_peak"),
|
||||
("mic_timeof_peak", "mic_time_of_peak"),
|
||||
):
|
||||
v = raw.get(raw_key)
|
||||
if v is None:
|
||||
continue
|
||||
fv = _parse_float(v)
|
||||
if fv is not None:
|
||||
out[out_key] = fv
|
||||
|
||||
# Microphone — Thor reports MicPSPL (dB(L)) which is the closest
|
||||
# analogue to BW's mic_ppv. The raw "99.4 dB(L)" string stays in
|
||||
# `out` under the original `mic_pspl` key for display; the parsed
|
||||
|
||||
@@ -0,0 +1,323 @@
|
||||
"""
|
||||
micromate/idf_to_bw_report.py — adapter that projects a parsed Thor IDF
|
||||
report (+ binary metadata + decoded IDFH intervals) into the
|
||||
``bw_report``-shaped dict that :mod:`sfm.report_pdf.gather_report_data`
|
||||
consumes.
|
||||
|
||||
Lets Thor events flow through the existing Series III Event Report PDF
|
||||
pipeline without duplicating the renderer. Thor's report content is
|
||||
~95% the same data shape as BW's; the field names differ but the
|
||||
underlying metrics map 1:1.
|
||||
|
||||
Caveats
|
||||
───────
|
||||
|
||||
- **Mic units** — Thor records ``MicPSPL`` natively in dB(L). This
|
||||
adapter sets ``bw_report.mic.pspl_dbl`` directly; the report
|
||||
renderer recomputes the equivalent psi via its dBL→psi formula.
|
||||
- **Saturation / above-range flags** — Thor doesn't always mark
|
||||
``OORANGE`` the way BW does; we set ``zc_freq_above_range`` only
|
||||
when a `>100` sentinel was preserved in the raw text.
|
||||
- **Per-interval data** — for IDFH events we build ``interval_times``
|
||||
by stepping ``IntervalSize`` from ``HistogramStartTime``; the binary
|
||||
decoder confirms one record per step (882 / 881 / 881 ... across
|
||||
the corpus).
|
||||
- **calibration_by parsing** — Thor's free-form ``Calibration : November
|
||||
22, 2023 by Instantel`` is split on ``" by "`` to extract the
|
||||
calibrator; the date prefix is parsed where possible, otherwise
|
||||
the binary-extracted ``calibration_date`` from
|
||||
:class:`micromate.idf_file.IdfBinaryMetadata` wins.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
# ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
_NUM_RE = re.compile(r"-?\d+(?:\.\d+)?")
|
||||
|
||||
|
||||
def _parse_first_number(s: Optional[str]) -> Optional[float]:
|
||||
"""Pull the first numeric token from a string like ``"0.1500 in/s"``."""
|
||||
if s is None:
|
||||
return None
|
||||
m = _NUM_RE.search(str(s))
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
return float(m.group(0))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _parse_interval_size_s(s: Optional[str]) -> Optional[float]:
|
||||
"""``"60 sec"`` → 60.0, ``"5 min"`` → 300.0, ``"1 hour"`` → 3600."""
|
||||
if s is None:
|
||||
return None
|
||||
num = _parse_first_number(s)
|
||||
if num is None:
|
||||
return None
|
||||
sl = str(s).lower()
|
||||
if "hour" in sl or "hr" in sl:
|
||||
return num * 3600.0
|
||||
if "min" in sl:
|
||||
return num * 60.0
|
||||
return num # default to seconds
|
||||
|
||||
|
||||
def _parse_calibration(text: Optional[str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Split ``"November 22, 2023 by Instantel"`` → (ISO date, calibrator).
|
||||
|
||||
Returns ``(None, None)`` if neither half parses.
|
||||
"""
|
||||
if not text:
|
||||
return None, None
|
||||
parts = str(text).split(" by ", 1)
|
||||
date_part = parts[0].strip() if parts else None
|
||||
by_part = parts[1].strip() if len(parts) > 1 else None
|
||||
iso_date: Optional[str] = None
|
||||
if date_part:
|
||||
for fmt in ("%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%m/%d/%Y"):
|
||||
try:
|
||||
iso_date = datetime.datetime.strptime(date_part, fmt).date().isoformat()
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
return iso_date, by_part
|
||||
|
||||
|
||||
def _channel_peaks(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]:
|
||||
"""Map ``tran_ppv`` / ``tran_zc_freq`` / ... → bw_report.peaks.tran shape."""
|
||||
out: Dict[str, Any] = {}
|
||||
for src, dst in (
|
||||
(f"{ch_lc}_ppv", "ppv_ips"),
|
||||
(f"{ch_lc}_zc_freq", "zc_freq_hz"),
|
||||
(f"{ch_lc}_time_of_peak", "time_of_peak_s"),
|
||||
(f"{ch_lc}_peak_acceleration", "peak_accel_g"),
|
||||
(f"{ch_lc}_peak_displacement", "peak_disp_in"),
|
||||
):
|
||||
v = idf.get(src)
|
||||
if v is not None:
|
||||
out[dst] = v
|
||||
# ZC freq ">100" sentinel: the raw text carries it under the un-typed
|
||||
# key (e.g. ``raw["tran_zc_freq"]`` would be ``">100"``), and our parser
|
||||
# dropped the typed entry. Detect that case and flag.
|
||||
raw_zc = idf.get(f"{ch_lc}_zc_freq")
|
||||
if isinstance(raw_zc, str) and ">" in raw_zc:
|
||||
out["zc_freq_above_range"] = True
|
||||
out.pop("zc_freq_hz", None)
|
||||
return out
|
||||
|
||||
|
||||
def _sensor_check(idf: Dict[str, Any], ch_lc: str) -> Dict[str, Any]:
|
||||
out: Dict[str, Any] = {}
|
||||
fr = idf.get(f"{ch_lc}_test_freq")
|
||||
if fr is not None:
|
||||
out["freq_hz"] = _parse_first_number(fr)
|
||||
rt = idf.get(f"{ch_lc}_test_ratio")
|
||||
if rt is not None:
|
||||
out["ratio"] = _parse_first_number(rt)
|
||||
am = idf.get(f"{ch_lc}_test_amplitude")
|
||||
if am is not None:
|
||||
out["amplitude_mv"] = _parse_first_number(am)
|
||||
res = idf.get(f"{ch_lc}_test_results")
|
||||
if res is not None:
|
||||
out["result"] = str(res).strip()
|
||||
return {k: v for k, v in out.items() if v is not None}
|
||||
|
||||
|
||||
def _interval_times(idf: Dict[str, Any], n_intervals: Optional[int]) -> List[str]:
|
||||
"""Synthesise per-interval timestamps from start + interval_size × k.
|
||||
|
||||
Returns ``[]`` when start time or interval size is unknown.
|
||||
"""
|
||||
if not n_intervals:
|
||||
return []
|
||||
start_date = idf.get("histogram_start_date") or idf.get("event_date")
|
||||
start_time = idf.get("histogram_start_time") or idf.get("event_time")
|
||||
iv_str = idf.get("interval_size")
|
||||
iv_s = _parse_interval_size_s(iv_str)
|
||||
if not (start_date and start_time and iv_s):
|
||||
return []
|
||||
try:
|
||||
t0 = datetime.datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
return []
|
||||
out = []
|
||||
for k in range(int(n_intervals)):
|
||||
t = t0 + datetime.timedelta(seconds=iv_s * (k + 1))
|
||||
out.append(t.isoformat())
|
||||
return out
|
||||
|
||||
|
||||
# ─── Top-level adapter ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def build_bw_report_from_idf(
|
||||
idf_report: Dict[str, Any],
|
||||
*,
|
||||
binary_md=None,
|
||||
intervals: Optional[list] = None,
|
||||
is_histogram: Optional[bool] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Project a parsed IDF report dict (and optional binary metadata +
|
||||
decoded IDFH intervals) into the BW report sidecar shape.
|
||||
|
||||
The returned dict is structurally identical to what
|
||||
``minimateplus.event_file_io._bw_report_to_dict`` produces from a
|
||||
real BW ASCII report — it can be assigned to
|
||||
``sidecar["bw_report"]`` and consumed verbatim by
|
||||
``sfm.report_pdf.gather_report_data``.
|
||||
|
||||
``intervals`` is the list of :class:`micromate.idf_file.IdfhInterval`
|
||||
objects from :func:`micromate.idf_file.decode_idfh_body`; only used
|
||||
for histogram events to derive accurate ``interval_times``.
|
||||
"""
|
||||
if is_histogram is None:
|
||||
et = str(idf_report.get("event_type", ""))
|
||||
is_histogram = et.lower().startswith("full histogram")
|
||||
|
||||
# ── Trigger / recording / device ─────────────────────────────────────
|
||||
trigger_channel = idf_report.get("trigger")
|
||||
trigger_level = _parse_first_number(idf_report.get("geo_trigger_level"))
|
||||
geo_range_ips = _parse_first_number(idf_report.get("geo_range"))
|
||||
|
||||
cal_iso, cal_by = _parse_calibration(idf_report.get("calibration"))
|
||||
# Prefer the binary-extracted calibration_date when our text parse fell
|
||||
# through; the binary date is unambiguous.
|
||||
if cal_iso is None and binary_md is not None and binary_md.calibration_date:
|
||||
cal_iso = binary_md.calibration_date.isoformat()
|
||||
|
||||
# ── Histogram fields ────────────────────────────────────────────────
|
||||
hist_block: Dict[str, Any] = {
|
||||
"start": None, "stop": None, "n_intervals": None,
|
||||
"interval_size": None, "interval_size_s": None,
|
||||
"channel_peak_when": {},
|
||||
}
|
||||
if is_histogram:
|
||||
sd = idf_report.get("histogram_start_date")
|
||||
st = idf_report.get("histogram_start_time")
|
||||
if sd and st:
|
||||
try:
|
||||
hist_block["start"] = datetime.datetime.strptime(
|
||||
f"{sd} {st}", "%Y-%m-%d %H:%M:%S"
|
||||
).isoformat()
|
||||
except ValueError:
|
||||
pass
|
||||
ed = idf_report.get("histogram_stop_date")
|
||||
et_ = idf_report.get("histogram_stop_time")
|
||||
if ed and et_:
|
||||
try:
|
||||
hist_block["stop"] = datetime.datetime.strptime(
|
||||
f"{ed} {et_}", "%Y-%m-%d %H:%M:%S"
|
||||
).isoformat()
|
||||
except ValueError:
|
||||
pass
|
||||
n_raw = idf_report.get("number_of_intervals")
|
||||
if n_raw is not None:
|
||||
try:
|
||||
# Thor reports a float like "81.04"; round to int (the BW
|
||||
# report uses an int for the column).
|
||||
hist_block["n_intervals"] = int(float(str(n_raw)))
|
||||
except ValueError:
|
||||
pass
|
||||
# When the binary decoder gave us the actual interval count, prefer it.
|
||||
if intervals is not None:
|
||||
hist_block["n_intervals"] = len(intervals)
|
||||
hist_block["interval_size"] = idf_report.get("interval_size")
|
||||
hist_block["interval_size_s"] = _parse_interval_size_s(idf_report.get("interval_size"))
|
||||
# interval_times derived from start+step (the BW report uses the
|
||||
# exact strings; we match its representation).
|
||||
times = _interval_times(idf_report, hist_block["n_intervals"])
|
||||
# Per-channel peak when (absolute date+time at which the channel's
|
||||
# peak occurred over the histogram run). Thor splits this into
|
||||
# ``TranPeakDate`` / ``TranPeakTime`` etc.
|
||||
peak_when: Dict[str, str] = {}
|
||||
for ch_label, ch_lc in (("Tran", "tran"), ("Vert", "vert"), ("Long", "long"), ("MicL", "mic")):
|
||||
d = idf_report.get(f"{ch_lc}_peak_date")
|
||||
t = idf_report.get(f"{ch_lc}_peak_time")
|
||||
if d and t:
|
||||
try:
|
||||
peak_when[ch_label] = datetime.datetime.strptime(
|
||||
f"{d} {t}", "%Y-%m-%d %H:%M:%S"
|
||||
).isoformat()
|
||||
except ValueError:
|
||||
continue
|
||||
if peak_when:
|
||||
hist_block["channel_peak_when"] = peak_when
|
||||
|
||||
# ── Mic block ────────────────────────────────────────────────────────
|
||||
mic_block = {
|
||||
"weighting": "L", # Thor mic is ISEE Linear
|
||||
"pspl_dbl": idf_report.get("mic_ppv"), # the dB(L) float
|
||||
"pspl_saturated": False,
|
||||
"zc_freq_hz": idf_report.get("mic_zc_freq"),
|
||||
"zc_freq_above_range": isinstance(idf_report.get("mic_zc_freq"), str)
|
||||
and ">" in str(idf_report.get("mic_zc_freq")),
|
||||
"time_of_peak_s": idf_report.get("mic_time_of_peak"),
|
||||
}
|
||||
if mic_block["zc_freq_above_range"]:
|
||||
mic_block["zc_freq_hz"] = None
|
||||
|
||||
# ── Peaks ────────────────────────────────────────────────────────────
|
||||
vs_block = {
|
||||
"ips": idf_report.get("peak_vector_sum"),
|
||||
"time_s": _parse_first_number(idf_report.get("peak_vector_sum_time_sum")),
|
||||
"when": None,
|
||||
"saturated": False,
|
||||
}
|
||||
if is_histogram:
|
||||
# PVS absolute date+time, when present.
|
||||
vs_d = idf_report.get("peak_vector_sum_date")
|
||||
vs_t = idf_report.get("peak_vector_sum_time")
|
||||
if vs_d and vs_t:
|
||||
try:
|
||||
vs_block["when"] = datetime.datetime.strptime(
|
||||
f"{vs_d} {vs_t}", "%Y-%m-%d %H:%M:%S"
|
||||
).isoformat()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return {
|
||||
"available": True,
|
||||
"event_type": idf_report.get("event_type"),
|
||||
"version": idf_report.get("version"),
|
||||
"trigger": {
|
||||
"channel": trigger_channel,
|
||||
"geo_level_ips": trigger_level,
|
||||
},
|
||||
"recording": {
|
||||
"sample_rate_sps": idf_report.get("sample_rate"),
|
||||
"record_time_s": idf_report.get("record_time_sec"),
|
||||
"pretrig_s": idf_report.get("pre_trigger_sec"),
|
||||
"stop_mode": idf_report.get("record_stop_mode"),
|
||||
"geo_range_ips": geo_range_ips,
|
||||
"units": idf_report.get("units"),
|
||||
},
|
||||
"device": {
|
||||
"battery_volts": idf_report.get("battery_volts"),
|
||||
"calibration_date": cal_iso,
|
||||
"calibration_by": cal_by,
|
||||
},
|
||||
"peaks": {
|
||||
"tran": _channel_peaks(idf_report, "tran"),
|
||||
"vert": _channel_peaks(idf_report, "vert"),
|
||||
"long": _channel_peaks(idf_report, "long"),
|
||||
"vector_sum": vs_block,
|
||||
},
|
||||
"mic": mic_block,
|
||||
"sensor_check": {
|
||||
"tran": _sensor_check(idf_report, "tran"),
|
||||
"vert": _sensor_check(idf_report, "vert"),
|
||||
"long": _sensor_check(idf_report, "long"),
|
||||
"mic": _sensor_check(idf_report, "mic"),
|
||||
},
|
||||
"histogram": hist_block,
|
||||
"monitor_log": [],
|
||||
"pc_sw_version": None,
|
||||
}
|
||||
@@ -639,6 +639,27 @@ class WaveformStore:
|
||||
# Time of Peak, sensor self-check, calibration, firmware).
|
||||
if report_dict:
|
||||
sidecar["extensions"]["idf_report"] = report_dict
|
||||
|
||||
# Project the IDF report into the BW report sidecar shape so the
|
||||
# existing Event Report PDF pipeline (sfm/report_pdf.py) can
|
||||
# render Thor events without needing a separate code path. Thor
|
||||
# data is 95% the same metric set as BW — the adapter handles
|
||||
# the field-name mapping.
|
||||
if report_dict or binary_md is not None:
|
||||
try:
|
||||
from micromate.idf_to_bw_report import build_bw_report_from_idf
|
||||
sidecar["bw_report"] = build_bw_report_from_idf(
|
||||
report_dict or {},
|
||||
binary_md=binary_md,
|
||||
intervals=idf_intervals,
|
||||
is_histogram=is_histogram,
|
||||
)
|
||||
except Exception as exc:
|
||||
log.warning(
|
||||
"save_imported_idf: idf→bw_report adapter failed for %s: %s — "
|
||||
"report PDF will fall back to DB-only fields",
|
||||
filename, exc,
|
||||
)
|
||||
# For histograms, also stash the binary-decoded per-interval
|
||||
# records so the UI / report layer doesn't need to re-walk the
|
||||
# IDFH file at render time.
|
||||
|
||||
Reference in New Issue
Block a user