""" sfm/report_pdf.py — generate Instantel-style Event Report PDFs. Stub layout for v0.20.0 — the exact visual is iterated against actual Blastware reference PDFs (uploaded to docs/reference/instantel/). Current output captures all the data fields a real BW Event Report contains, but the visual hierarchy / spacing is still approximate. Architecture ──────────── 1. ``gather_report_data(event_id)`` — assembles a flat dict from three sources: the SeismoDb events row, the .sfm.json sidecar (bw_report block), and the .h5 waveform samples. Returns ``None`` when the event doesn't exist or has no waveform data on disk. 2. ``render_event_report_pdf(data)`` — takes that dict and produces a single-page letter-sized PDF as bytes, using matplotlib's PDF backend (vector output, no rasterization, prints cleanly). 3. The HTTP endpoint at ``/db/events/{id}/report.pdf`` wires them together: fetch event → gather → render → stream bytes back with ``Content-Type: application/pdf``. What's in the report (every field BW's printout includes): Header (left): Date/Time, Trigger Source, Range, Sample Rate, Notes, Project, Client, User Name, Seis. Loc Header (right): Serial + firmware, Battery, Calibration, File Name, Post Event Notes Mic block: PSPL (dBL + psi), ZC Freq, Channel Test result Stats table: per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak Displacement / Sensor Check Peak Vector Sum Waveform plot: 4 channels stacked (MicL/Long/Vert/Tran), shared time axis, trigger marker, peak markers USBM RI8507/OSMRE compliance chart: STUBBED — separate work item Histogram events: the layout differs (Number of Intervals header field, no trigger marker, per-interval bar chart instead of waveform). Handled via a record_type branch in ``render_event_report_pdf``. """ from __future__ import annotations import io import json import logging import math from dataclasses import dataclass, field from pathlib import Path from typing import Optional import matplotlib matplotlib.use("Agg") # headless — no display required import matplotlib.pyplot as plt import numpy as np from matplotlib.backends.backend_pdf import PdfPages log = logging.getLogger(__name__) # Reference pressure for dB(L) conversion: 20 µPa expressed in psi. DBL_REF_PSI = 2.9e-9 # ── Data assembly ──────────────────────────────────────────────────────────── @dataclass class ReportData: """All fields needed to render an Instantel-style Event Report. Most fields are Optional — BW's printout shows '—' or just omits sections when source data is missing. The renderer mirrors that. """ # Header — left column event_datetime_str: Optional[str] = None trigger_source: Optional[str] = None geo_range_str: Optional[str] = None sample_rate_str: Optional[str] = None notes: Optional[str] = None project: Optional[str] = None client: Optional[str] = None operator: Optional[str] = None sensor_location: Optional[str] = None # Header — right column serial: Optional[str] = None firmware: Optional[str] = None battery_volts: Optional[float] = None calibration_date: Optional[str] = None calibration_by: Optional[str] = None file_name: Optional[str] = None post_event_notes: Optional[str] = None # Microphone block mic_pspl_dbl: Optional[float] = None mic_pspl_psi: Optional[float] = None mic_pspl_time_s: Optional[float] = None mic_pspl_when_str: Optional[str] = None # histogram absolute date+time, BW-formatted mic_zc_freq_hz: Optional[float] = None mic_channel_test_result: Optional[str] = None mic_channel_test_freq_hz: Optional[float] = None mic_channel_test_amp_mv: Optional[float] = None # Per-channel stats — list of dicts (one per channel) # Keys: name, ppv_ips, zc_freq_hz, time_of_peak_s, # peak_accel_g, peak_disp_in, sensor_check channel_stats: list[dict] = field(default_factory=list) # Peak Vector Sum peak_vector_sum_ips: Optional[float] = None peak_vector_sum_time_s: Optional[float] = None # Waveform samples — channels[ch] = list of floats in physical units # Time axis derived from sample_rate + pretrig_samples channels: dict = field(default_factory=dict) sample_rate_sps: Optional[int] = None pretrig_samples: Optional[int] = None t0_ms: Optional[float] = None dt_ms: Optional[float] = None # Record-type discriminator record_type: Optional[str] = None is_histogram: bool = False # Histogram-only fields — only populated for record_type starts with 'Hist' histogram_start_str: Optional[str] = None # "22:30:38 May 16, 2026" histogram_stop_str: Optional[str] = None histogram_n_intervals: Optional[float] = None # 4.00 histogram_interval_size: Optional[str] = None # "1 minute" histogram_interval_size_s: Optional[float] = None # 60.0 — numeric seconds, used to derive interval_times histogram_interval_times: list[str] = field(default_factory=list) # per-interval timestamps for x-axis # Peak Vector Sum metadata (histograms show absolute date+time) peak_vector_sum_when_str: Optional[str] = None # Bookkeeping event_id: Optional[str] = None server_received_at: Optional[str] = None bw_pc_sw_version: Optional[str] = None def gather_report_data( db, store, event_id: str, ) -> Optional[ReportData]: """Collect every field needed to render an event report. Returns ``None`` if the event is unknown or has no waveform data on disk (no .h5, no .a5.pkl — same condition the waveform.json endpoint 404s on). """ row = db.get_event(event_id) if row is None: return None serial = row.get("serial") filename = row.get("blastware_filename") if not serial or not filename: return None rd = ReportData( event_id=event_id, serial=serial, file_name=filename, record_type=row.get("record_type"), is_histogram=str(row.get("record_type", "")).lower().startswith("hist"), event_datetime_str=row.get("timestamp"), sample_rate_sps=row.get("sample_rate"), project=row.get("project"), client=row.get("client"), operator=row.get("operator"), sensor_location=row.get("sensor_location"), server_received_at=row.get("created_at"), ) # ── Sidecar bw_report — the rich BW-derived fields ── sidecar_path = store.sidecar_path_for(serial, filename) if sidecar_path.exists(): try: sc = json.loads(sidecar_path.read_text()) except Exception as exc: log.warning("gather_report_data: sidecar read failed: %s", exc) sc = {} bw = sc.get("bw_report") or {} # Trigger / range / sample-rate display trig = bw.get("trigger") or {} rd.trigger_source = ( f"{trig.get('channel','')}: {trig.get('geo_level_ips')} in/s" if trig.get("channel") or trig.get("geo_level_ips") is not None else None ) rec = bw.get("recording") or {} rd.geo_range_str = ( f"Geo: {rec.get('geo_range_ips')} in/s" if rec.get("geo_range_ips") is not None else None ) rt = rec.get("record_time_s") if rt is not None and rd.sample_rate_sps: rd.sample_rate_str = f"{rt:.1f} sec At {rd.sample_rate_sps} Sps" # Device block dev = bw.get("device") or {} rd.battery_volts = dev.get("battery_volts") rd.calibration_date = dev.get("calibration_date") rd.calibration_by = dev.get("calibration_by") rd.firmware = bw.get("version") rd.bw_pc_sw_version = bw.get("pc_sw_version") # Microphone block mic = bw.get("mic") or {} rd.mic_pspl_dbl = mic.get("pspl_dbl") if rd.mic_pspl_dbl is not None and rd.mic_pspl_dbl > 0: # Inverse of the dBL formula → psi. Mirrors waveform_codec convention. rd.mic_pspl_psi = DBL_REF_PSI * (10 ** (rd.mic_pspl_dbl / 20)) rd.mic_pspl_time_s = mic.get("time_of_peak_s") rd.mic_zc_freq_hz = mic.get("zc_freq_hz") sc_mic = (bw.get("sensor_check") or {}).get("mic") or {} rd.mic_channel_test_result = sc_mic.get("result") rd.mic_channel_test_freq_hz = sc_mic.get("freq_hz") rd.mic_channel_test_amp_mv = sc_mic.get("amplitude_mv") # Per-channel stats (Tran / Vert / Long). Per-channel peak # date+time for histograms comes from bw_report.histogram.channel_peak_when # (populated when the parser captured it; see the bw_ascii_report # parser's histogram-fields handler). peaks = bw.get("peaks") or {} sc_block = bw.get("sensor_check") or {} hist_block = bw.get("histogram") or {} peak_when = hist_block.get("channel_peak_when") or {} for ch_lc, ch_label in (("tran", "Tran"), ("vert", "Vert"), ("long", "Long")): ch = peaks.get(ch_lc) or {} sc_ch = sc_block.get(ch_lc) or {} ch_when_iso = peak_when.get(ch_label) peak_date, peak_time = _split_iso_to_date_time(ch_when_iso) rd.channel_stats.append({ "name": ch_label, "ppv_ips": ch.get("ppv_ips"), "zc_freq_hz": ch.get("zc_freq_hz"), "time_of_peak_s": ch.get("time_of_peak_s"), "peak_accel_g": ch.get("peak_accel_g"), "peak_disp_in": ch.get("peak_disp_in"), "sensor_check": sc_ch.get("result"), "peak_date": peak_date, "peak_time": peak_time, }) # MicL peak time (used in the mic block — "PSPL ... on DATE at TIME") mic_when_iso = peak_when.get("MicL") rd.mic_pspl_when_str = _fmt_iso_to_bw(mic_when_iso) if mic_when_iso else None # Peak Vector Sum vs = peaks.get("vector_sum") or {} rd.peak_vector_sum_ips = vs.get("ips") rd.peak_vector_sum_time_s = vs.get("time_s") # PVS absolute date+time (histograms). Same formatting as Mic. pvs_when_iso = vs.get("when") rd.peak_vector_sum_when_str = _fmt_iso_to_bw(pvs_when_iso) if pvs_when_iso else None # Histogram-specific header fields — keys match the projection in # _bw_report_to_dict ("start" / "stop", not "_str" suffixed). if rd.is_histogram: rd.histogram_start_str = hist_block.get("start") or rd.event_datetime_str rd.histogram_stop_str = hist_block.get("stop") rd.histogram_n_intervals = hist_block.get("n_intervals") rd.histogram_interval_size = hist_block.get("interval_size") rd.histogram_interval_size_s = hist_block.get("interval_size_s") rd.histogram_interval_times = hist_block.get("interval_times") or [] # ── Waveform samples — from the .h5 via the existing helper ── from sfm import event_hdf5 h5_path = store.hdf5_path_for(serial, filename) if h5_path.exists(): try: wf = event_hdf5.plot_json_from_hdf5(h5_path, event_id=event_id) rd.channels = { ch: (chd.get("values") or []) for ch, chd in (wf.get("channels") or {}).items() } ta = wf.get("time_axis") or {} rd.sample_rate_sps = rd.sample_rate_sps or ta.get("sample_rate") rd.pretrig_samples = ta.get("pretrig_samples") rd.t0_ms = ta.get("t0_ms") rd.dt_ms = ta.get("dt_ms") except Exception as exc: log.warning("gather_report_data: hdf5 read failed: %s", exc) # ── Histogram aggregation ── # Codec emits ~N per-block samples (typically 1/sec); BW reports # one bar per configured interval (1 min / 5 min / etc.). When # bw_report.histogram.n_intervals is populated (events ingested # with the parser extension), group max-per-group to match. Also # derives per-interval timestamps for the x-axis. No-op for # waveform events or when n_intervals is missing. if rd.is_histogram and rd.histogram_n_intervals and rd.histogram_n_intervals >= 1: n = int(rd.histogram_n_intervals) for ch, vals in list(rd.channels.items()): if not vals: continue per_group = len(vals) // n remainder = len(vals) % n agg: list = [] offset = 0 for i in range(n): grp_size = per_group + (1 if i < remainder else 0) if grp_size > 0: grp = vals[offset:offset + grp_size] agg.append(max((abs(v) for v in grp if v is not None), default=0)) offset += grp_size else: agg.append(0) rd.channels[ch] = agg # Derive per-interval HH:MM:SS labels if we have the start time + size if rd.histogram_start_str and rd.histogram_interval_size_s and not rd.histogram_interval_times: try: import datetime as _dt start = _dt.datetime.fromisoformat(rd.histogram_start_str) rd.histogram_interval_times = [ (start + _dt.timedelta(seconds=(i + 1) * rd.histogram_interval_size_s)).strftime("%H:%M:%S") for i in range(n) ] except Exception: pass return rd # ── PDF rendering ──────────────────────────────────────────────────────────── def render_event_report_pdf(rd: ReportData) -> bytes: """Render an event report dict to a single-page letter PDF. Branches on ``rd.is_histogram`` — waveform and histogram layouts differ in their header fields, stats-table rows, and bottom plot. Layout modeled on Blastware's Event Report PDFs (samples in docs/reference/instantel/). """ # Letter portrait — 8.5"×11" fig = plt.figure(figsize=(8.5, 11), dpi=100) fig.patch.set_facecolor("white") if rd.is_histogram: _render_histogram_layout(fig, rd) else: _render_waveform_layout(fig, rd) # Page footer (common to both layouts) — Created date + event id. # Pushed to the very page bottom so it doesn't collide with the # waveform footer scale / trigger legend lines just above. # Convert UTC server_received_at to local for display. created_local = _fmt_iso_to_bw(rd.server_received_at) if rd.server_received_at else "—" fig.text( 0.07, 0.005, f"Created: {created_local} • seismo-relay", fontsize=6, color="#888", ha="left", ) fig.text( 0.93, 0.005, f"Event {rd.event_id[:8] if rd.event_id else '—'}", fontsize=6, color="#888", ha="right", ) buf = io.BytesIO() fig.savefig(buf, format="pdf") plt.close(fig) return buf.getvalue() def _render_waveform_layout(fig, rd: ReportData) -> None: """Waveform layout: header / mic+USBM / per-channel stats / waveform plot. Stats table includes Time (Rel. to Trig), Peak Accel, Peak Disp. Left margin sized to fit the channel labels (MicL/Long/Vert/Tran). Extra bottom margin reserves space for x-axis tick labels + "Amplitude Geo: X in/s/div Mic: Y psi(L)/div" footer + trigger legend without overlap. """ gs = fig.add_gridspec( nrows=4, ncols=1, left=0.11, right=0.94, top=0.97, bottom=0.12, height_ratios=[1.7, 2.0, 1.8, 5.5], hspace=0.35, ) ax_header = fig.add_subplot(gs[0]); ax_header.axis("off") _draw_header_waveform(ax_header, rd) ax_mid = fig.add_subplot(gs[1]); ax_mid.axis("off") _draw_mic_and_usbm(ax_mid, rd) ax_stats = fig.add_subplot(gs[2]); ax_stats.axis("off") _draw_channel_stats_waveform(ax_stats, rd) _draw_waveform_subplot(fig, gs[3], rd) def _render_histogram_layout(fig, rd: ReportData) -> None: """Histogram layout: header / mic-only / per-channel stats / bar plot. No USBM compliance chart (it's a waveform-only concept). Stats table uses Date + Time-of-peak instead of relative-time + accel + disp. Left margin sized to fit the channel labels. Extra bottom margin leaves room for the x-axis time labels + footer scale legend without overlap. """ gs = fig.add_gridspec( nrows=4, ncols=1, left=0.11, right=0.94, top=0.97, bottom=0.12, height_ratios=[1.8, 0.9, 1.7, 5.6], hspace=0.35, ) ax_header = fig.add_subplot(gs[0]); ax_header.axis("off") _draw_header_histogram(ax_header, rd) ax_mic = fig.add_subplot(gs[1]); ax_mic.axis("off") _draw_mic_only(ax_mic, rd) ax_stats = fig.add_subplot(gs[2]); ax_stats.axis("off") _draw_channel_stats_histogram(ax_stats, rd) _draw_histogram_subplot(fig, gs[3], rd) def _to_display_local(iso: str): """Parse an ISO timestamp and return a datetime in the system's local timezone (set by the TZ env var, default America/New_York via the Dockerfile). Behaviour: - "...Z" or "...+HH:MM" suffix → tz-aware UTC → converted to local - Naïve "YYYY-MM-DDTHH:MM:SS" (no tz) → returned as-is. This matches the convention used elsewhere in seismo-relay: BW's recorded-at timestamps are naïve and ALREADY in the unit's local clock; we don't second-guess them. """ import datetime as _dt dt = _dt.datetime.fromisoformat(iso.replace("Z", "+00:00")) if dt.tzinfo is not None: # Convert from UTC (or other tz) → local per the TZ env var. # astimezone() without arg uses the system timezone. dt = dt.astimezone() return dt def _fmt_iso_to_bw(iso: Optional[str]) -> Optional[str]: """Convert an ISO-8601 timestamp to BW's display format '22:30:37 May 16, 2026'. UTC inputs (with Z suffix) are converted to the system's local timezone first; naïve inputs are formatted as-is. Returns input unchanged on parse failure.""" if not iso or "T" not in iso: return iso try: return _to_display_local(iso).strftime("%H:%M:%S %B %d, %Y").replace(" 0", " ") except Exception: return iso def _split_iso_to_date_time(iso: Optional[str]) -> tuple[Optional[str], Optional[str]]: """Split an ISO timestamp into BW-formatted ('May 27 /26', '06:06:14') date+time strings. Used for the histogram stats table where the Date and Time rows are presented separately. UTC inputs are converted to local time first. Returns (None, None) on parse failure.""" if not iso: return (None, None) try: dt = _to_display_local(iso) # BW format: 'May 27 /26' (3-letter month + 2-digit year) date_str = dt.strftime("%b %d /%y").replace(" 0", " ") time_str = dt.strftime("%H:%M:%S") return (date_str, time_str) except Exception: return (None, None) def _kv(ax, x, y, label, value, *, label_w=0.18): """Render a 'Label Value' row at axes-coordinates (x, y).""" ax.text(x, y, label, fontsize=8, color="#555", ha="left", va="top", transform=ax.transAxes) ax.text(x + label_w, y, _fmt(value), fontsize=8, ha="left", va="top", transform=ax.transAxes, family="monospace") def _fmt(v): """Format any field for display — '—' for None, str otherwise.""" if v is None: return "—" if isinstance(v, float): return f"{v:.4f}".rstrip("0").rstrip(".") return str(v) def _draw_header_waveform(ax, rd: ReportData) -> None: """Two-column metadata header — waveform variant.""" rows_left = [ ("Date/Time", _fmt_iso_to_bw(rd.event_datetime_str)), ("Trigger Source", rd.trigger_source), ("Range", rd.geo_range_str), ("Sample Rate", rd.sample_rate_str), ("Notes", rd.notes), ("Project:", rd.project), ("Client:", rd.client), ("User Name:", rd.operator), ("Seis. Loc:", rd.sensor_location), ] _draw_header_columns(ax, rows_left, rd) def _draw_header_histogram(ax, rd: ReportData) -> None: """Two-column metadata header — histogram variant. Histograms have Start / Finish / Intervals fields instead of Trigger Source (there's no trigger event for a histogram capture). """ intervals_str = None if rd.histogram_n_intervals is not None and rd.histogram_interval_size: intervals_str = f"{rd.histogram_n_intervals} At {rd.histogram_interval_size}" rows_left = [ ("Start", _fmt_iso_to_bw(rd.histogram_start_str or rd.event_datetime_str)), ("Finish", _fmt_iso_to_bw(rd.histogram_stop_str)), ("Intervals", intervals_str), ("Range", rd.geo_range_str), ("Sample Rate", (f"{rd.sample_rate_sps} Sps" if rd.sample_rate_sps else None)), ("Notes", rd.notes), ("Project:", rd.project), ("Client:", rd.client), ("User Name:", rd.operator), ("Seis. Loc:", rd.sensor_location), ] _draw_header_columns(ax, rows_left, rd) def _draw_header_columns(ax, rows_left, rd: ReportData) -> None: """Shared 2-column header rendering used by both layouts.""" rows_right = [ ("Serial Number", f"{rd.serial or '—'}" + (f" {rd.firmware}" if rd.firmware else "")), ("Battery Level", f"{rd.battery_volts:.1f} Volts" if rd.battery_volts is not None else None), ("Unit Calibration", (f"{rd.calibration_date}" + (f" by {rd.calibration_by}" if rd.calibration_by else "")) if rd.calibration_date else None), ("File Name", rd.file_name), ("Post Event Notes", rd.post_event_notes), ] y = 0.95 dy = 0.095 for label, value in rows_left: _kv(ax, 0.0, y, label, value, label_w=0.18) y -= dy y = 0.95 for label, value in rows_right: _kv(ax, 0.55, y, label, value, label_w=0.20) y -= dy def _draw_mic_only(ax, rd: ReportData) -> None: """Mic block (histogram variant — no USBM chart).""" ax.text(0.0, 0.95, "Microphone Linear Weighting", fontsize=8, color="#555", transform=ax.transAxes, va="top") rows = _mic_rows(rd) y = 0.70 for label, value in rows: _kv(ax, 0.0, y, label, value, label_w=0.18) y -= 0.22 def _draw_mic_and_usbm(ax, rd: ReportData) -> None: """Mic block on the left + USBM compliance chart placeholder on right. (Waveform variant — USBM is a velocity-vs-frequency compliance plot that doesn't apply to histograms.)""" ax.text(0.0, 0.95, "Microphone Linear Weighting", fontsize=8, color="#555", transform=ax.transAxes, va="top") rows = _mic_rows(rd) y = 0.80 for label, value in rows: _kv(ax, 0.0, y, label, value, label_w=0.18) y -= 0.15 # USBM chart placeholder — upper-right. Real piecewise compliance # curves are a separate work item; for now this just shows the title # + a "see report" message so the layout is correct. ax.text(0.72, 0.97, "USBM RI8507 And OSMRE", fontsize=9, weight="bold", color="#333", ha="center", va="top", transform=ax.transAxes) ax.text(0.72, 0.50, "[compliance chart\ncoming soon]", fontsize=8, color="#bbb", ha="center", va="center", transform=ax.transAxes, style="italic") def _mic_rows(rd: ReportData) -> list[tuple[str, Optional[str]]]: """Build the mic-section value rows (shared by both layouts). For histograms, BW formats the PSPL line as "125.7 dB(L) on May 27, 2026 at 06:19:14" (absolute date+time of peak). Waveform events show the relative "at 0.012 sec." instead. Both formats covered here based on which field is populated. """ rows: list[tuple[str, Optional[str]]] = [] if rd.mic_pspl_dbl is not None: line = f"{rd.mic_pspl_dbl:.1f} dB(L)" if rd.mic_pspl_when_str: # Histogram-style: "PSPL 125.7 dB(L) on May 27, 2026 at 06:19:14" # mic_pspl_when_str is already "HH:MM:SS Month DD, YYYY"; # reformat to "on Month DD, YYYY at HH:MM:SS" for BW match. parts = rd.mic_pspl_when_str.split(" ", 1) if len(parts) == 2: line += f" on {parts[1]} at {parts[0]}" else: line += f" on {rd.mic_pspl_when_str}" elif rd.mic_pspl_time_s is not None: # Waveform-style: relative-to-trigger seconds. line += f" at {rd.mic_pspl_time_s:.3f} sec." rows.append(("PSPL", line)) if rd.mic_zc_freq_hz is not None: rows.append(("ZC Freq", f"{rd.mic_zc_freq_hz:.0f} Hz")) if rd.mic_channel_test_result: line = rd.mic_channel_test_result if rd.mic_channel_test_freq_hz is not None and rd.mic_channel_test_amp_mv is not None: line += (f" (Freq = {rd.mic_channel_test_freq_hz:.1f} Hz, " f"Amp = {rd.mic_channel_test_amp_mv:.0f} mv)") rows.append(("Channel Test", line)) return rows def _draw_channel_stats_waveform(ax, rd: ReportData) -> None: """Waveform stats table — has Time (Rel. to Trig), Peak Accel, Peak Disp. Followed by Peak Vector Sum line.""" rows_spec = [ ("PPV", "ppv_ips", "in/s"), ("ZC Freq", "zc_freq_hz", "Hz"), ("Time (Rel. to Trig)", "time_of_peak_s", "sec"), ("Peak Acceleration", "peak_accel_g", "g"), ("Peak Displacement", "peak_disp_in", "in"), ("Sensor Check", "sensor_check", ""), ] _draw_stats_table(ax, rd, rows_spec) if rd.peak_vector_sum_ips is not None: line = f"Peak Vector Sum {rd.peak_vector_sum_ips:.3f} in/s" if rd.peak_vector_sum_time_s is not None: line += f" At {rd.peak_vector_sum_time_s:.3f} sec." ax.text(0.0, -0.08, line, fontsize=9, weight="bold", ha="left", va="top", transform=ax.transAxes) ax.text(0.0, -0.18, "NA: Not Applicable", fontsize=7, color="#888", ha="left", va="top", transform=ax.transAxes) def _draw_channel_stats_histogram(ax, rd: ReportData) -> None: """Histogram stats table — PPV, ZC Freq, Date, Time of peak, Sensor Check. Followed by Peak Vector Sum line.""" # Date / Time of peak are per-channel timestamps for the interval at peak. # bw_report stores time_of_peak_s as relative seconds, but for histograms # BW shows them as absolute date+time. We populate from rd.channel_stats # if those absolute fields are present; otherwise fall back to relative. rows_spec = [ ("PPV", "ppv_ips", "in/s"), ("ZC Freq", "zc_freq_hz", "Hz"), ("Date", "peak_date", ""), ("Time", "peak_time", ""), ("Sensor Check", "sensor_check", ""), ] _draw_stats_table(ax, rd, rows_spec) if rd.peak_vector_sum_ips is not None: line = f"Peak Vector Sum {rd.peak_vector_sum_ips:.3f} in/s" # Histograms: "0.091 in/s on May 27, 2026 At 06:06:14" # The when_str is "HH:MM:SS Month DD, YYYY" — reformat for BW match. if rd.peak_vector_sum_when_str: parts = rd.peak_vector_sum_when_str.split(" ", 1) if len(parts) == 2: line += f" on {parts[1]} At {parts[0]}" else: line += f" on {rd.peak_vector_sum_when_str}" ax.text(0.0, -0.08, line, fontsize=9, weight="bold", ha="left", va="top", transform=ax.transAxes) ax.text(0.0, -0.18, "NA: Not Applicable", fontsize=7, color="#888", ha="left", va="top", transform=ax.transAxes) def _draw_stats_table(ax, rd: ReportData, rows_spec: list[tuple[str, str, str]]) -> None: """Render a per-channel stats table (Tran/Vert/Long). rows_spec: list of (label, field_name_in_channel_stats, unit_string) """ headers = ["", "Tran", "Vert", "Long", ""] ch_lookup = {c["name"]: c for c in rd.channel_stats} def _cell(field, ch_name): val = ch_lookup.get(ch_name, {}).get(field) if val is None: return "—" if isinstance(val, float): # ZC Freq is integer-formatted in BW; everything else with 3 decimals if field == "zc_freq_hz": return f"{val:.0f}" return f"{val:.3f}" return str(val) table_data = [headers] for label, field_name, unit in rows_spec: table_data.append([ label, _cell(field_name, "Tran"), _cell(field_name, "Vert"), _cell(field_name, "Long"), unit, ]) tbl = ax.table( cellText=table_data, loc="upper left", colWidths=[0.28, 0.14, 0.14, 0.14, 0.10], cellLoc="left", edges="open", ) tbl.auto_set_font_size(False) tbl.set_fontsize(8) tbl.scale(1, 1.4) for j in range(5): tbl[(0, j)].set_text_props(weight="bold", color="#555") def _channel_axis_color(ch: str) -> str: return {"MicL": "#cc00cc", "Long": "#0066ff", "Vert": "#009933", "Tran": "#cc0000"}.get(ch, "#444") def _draw_waveform_subplot(fig, gridspec_cell, rd: ReportData) -> None: """4-channel stacked waveform plot — Instantel printout order (MicL on top, Tran on bottom), shared x-axis in SECONDS, trigger triangle markers at t=0, '0.0' baseline label on right of each.""" inner = gridspec_cell.subgridspec(4, 1, hspace=0.0) order = ["MicL", "Long", "Vert", "Tran"] sr = rd.sample_rate_sps or 1024 # Convert ms-based time axis to seconds for the x-axis dt_s = (rd.dt_ms or (1000.0 / sr)) / 1000.0 t0_s = (rd.t0_ms if rd.t0_ms is not None else 0.0) / 1000.0 last_idx = len(order) - 1 for i, ch in enumerate(order): ax = fig.add_subplot(inner[i]) values = rd.channels.get(ch) or [] times = [t0_s + j * dt_s for j in range(len(values))] if values: color = _channel_axis_color(ch) ax.plot(times, values, color=color, linewidth=0.5) # Symmetric y-axis for geo; zero-anchored for mic. if ch != "MicL": amax = max((abs(v) for v in values), default=0.001) ax.set_ylim(-amax * 1.10, amax * 1.10) else: amax = max((abs(v) for v in values), default=0.001) ax.set_ylim(-amax * 1.10, amax * 1.10) # Channel label on the LEFT (matches BW) ax.set_ylabel(ch, fontsize=8, rotation=0, ha="right", va="center", color=_channel_axis_color(ch), weight="bold", labelpad=14) # "0.0" on the RIGHT (BW convention) ax.text(1.005, 0.5, "0.0", transform=ax.transAxes, fontsize=7, color="#555", va="center", ha="left") ax.grid(True, linestyle="--", linewidth=0.3, color="#bbb", alpha=0.6) # Vertical dashed trigger line at t=0 ax.axvline(0.0, color="#cc0000", linestyle="--", linewidth=0.6, alpha=0.7) # Zero baseline horizontal ax.axhline(0.0, color=_channel_axis_color(ch), linestyle="-", linewidth=0.4, alpha=0.5) if i != last_idx: ax.set_xticklabels([]) ax.tick_params(axis="x", length=0) else: ax.tick_params(axis="x", labelsize=7) ax.tick_params(axis="y", labelsize=6) # Trigger triangle marker ▼ above the top channel at t=0 top_ax = fig.axes[-4] # MicL is the first added in this gridspec top_ax.plot([0], [top_ax.get_ylim()[1]], marker="v", color="black", markersize=8, clip_on=False, zorder=10) # Compute scale-per-division for the footer (10 divs across the chart) # and find peak geo amplitude for the geo amp/div setting. total_s = times[-1] - times[0] if values else 0 div_s = total_s / 10 if total_s > 0 else 0 geo_amp_div = "—" for ch in ("Tran", "Vert", "Long"): v = rd.channels.get(ch) or [] if v: amax = max(abs(x) for x in v) geo_amp_div = f"{(amax * 1.1 * 2) / 10:.3f}" break fig.text( 0.11, 0.030, f"Time(Seconds) {div_s:.2f} sec/div Amplitude Geo: {geo_amp_div} in/s/div Mic: 0.001 psi(L)/div", fontsize=7, color="#444", ha="left", ) fig.text( 0.11, 0.018, "Trigger = ▶━━━━━ ━━━━━━◀", fontsize=7, color="#444", ha="left", ) def _draw_histogram_subplot(fig, gridspec_cell, rd: ReportData) -> None: """4-channel stacked histogram bar chart — per-interval peaks. X-axis labeled with the actual times from rd.histogram_interval_times when available; otherwise interval index. """ inner = gridspec_cell.subgridspec(4, 1, hspace=0.0) order = ["MicL", "Long", "Vert", "Tran"] last_idx = len(order) - 1 # X-axis: use absolute time labels if we have them, else interval index have_times = bool(rd.histogram_interval_times) for i, ch in enumerate(order): ax = fig.add_subplot(inner[i]) values = rd.channels.get(ch) or [] if values: # Histograms record per-interval PEAK magnitudes — always # non-negative. Codec output occasionally includes signed # values when the underlying .h5 was scaled like a waveform; # take the absolute value so the bars rise from zero. abs_vals = [abs(v) if v is not None else 0 for v in values] xs = np.arange(len(abs_vals)) color = _channel_axis_color(ch) ax.bar(xs, abs_vals, color=color, width=0.85, linewidth=0) amax = max(abs_vals, default=0) if amax > 0: ax.set_ylim(0, amax * 1.10) ax.set_ylabel(ch, fontsize=8, rotation=0, ha="right", va="center", color=_channel_axis_color(ch), weight="bold", labelpad=14) ax.text(1.005, 0.02, "0.0", transform=ax.transAxes, fontsize=7, color="#555", va="bottom", ha="left") ax.grid(True, axis="y", linestyle="--", linewidth=0.3, color="#bbb", alpha=0.6) if i != last_idx: ax.set_xticklabels([]) ax.tick_params(axis="x", length=0) else: if have_times and len(rd.histogram_interval_times) == len(values): # Show 2-4 labels evenly spaced n = len(values) step = max(1, n // 4) tick_positions = list(range(0, n, step)) ax.set_xticks(tick_positions) ax.set_xticklabels([rd.histogram_interval_times[t] for t in tick_positions], rotation=0, fontsize=6) else: ax.set_xlabel("Interval", fontsize=8) ax.tick_params(axis="x", labelsize=7) ax.tick_params(axis="y", labelsize=6) # Footer scale info — histograms use minute/div interval_str = rd.histogram_interval_size or "—" geo_amp_div = "—" for ch in ("Tran", "Vert", "Long"): v = rd.channels.get(ch) or [] if v: amax = max(abs(x) for x in v) geo_amp_div = f"{amax / 5:.3f}" break fig.text( 0.11, 0.030, f"Time {interval_str} /div Amplitude Geo: {geo_amp_div} in/s/div Mic: 0.001 psi(L)/div", fontsize=7, color="#444", ha="left", )