""" scripts/backfill_thor_events.py — re-process existing Thor (Series IV) events so their sidecars carry the bw_report block produced by ``micromate.idf_to_bw_report.build_bw_report_from_idf`` + their .h5 clean-waveform files for IDFW events. Why this exists ─────────────── Thor events ingested before v0.21.0 (or during the v0.21.0 ingest bug window fixed in commit bee1185) have sidecars with only ``extensions.idf_report`` — no ``bw_report`` block. Without ``bw_report``, the SFM PDF renderer falls back to DB-only fields (misses sensor-self-check, full per-channel breakdown, mic dB(L)), and the modal chart 404s on ``/waveform.json`` for IDFW events because no .h5 was written when the codec failed at ingest. Re-forwarding from thor-watcher would also fix this, but that requires operator coordination on every watcher machine and uses bandwidth this script doesn't. What this does ────────────── Walks ``//`` for ``.IDFW`` / ``.IDFH`` files and, for each one: 1. Reads the existing sidecar (preserving review state + captured_at). 2. Re-runs ``micromate.idf_file.read_idf_file()`` on the binary bytes — passing ``data=`` so the codec doesn't try to read from a path it doesn't know. 3. Pulls ``extensions.idf_report`` (the raw parsed Thor dict the v0.18.0+ ingest path already stashed) and runs the v0.21.0 ``build_bw_report_from_idf`` adapter against it. 4. Writes the refreshed sidecar with the new ``bw_report``, bumped ``source.tool_version``, but preserved ``review`` block + the original ``captured_at`` timestamp. 5. Regenerates the .h5 waveform file via the existing ``event_hdf5`` writer. For IDFW that's the decoded per-sample stream; for IDFH it's a 1-sample-per-interval synthesised array (peak ADC count per channel) so the renderer's bar-chart code has data to group on. Mic peak psi from the binary is merged onto the IdfEvent before the bridge so the h5 writer's per-count mic scale factor lands on a sensible value (without this the mic chart on Thor events plots dB(L)-as-pseudo-psi and shows bomb-level numbers). Idempotent. Re-running it after a parser/adapter change just re-writes sidecars — no DB writes, no thor-watcher coordination. Usage ───── python scripts/backfill_thor_events.py [--store-root PATH] [--dry-run] [--skip-hdf5] [--force] [-v] By default, refreshes any Thor event whose sidecar is missing ``bw_report`` OR whose ``source.tool_version`` is older than the current ``TOOL_VERSION``. ``--force`` refreshes every Thor event regardless. """ from __future__ import annotations import argparse import logging import sys from pathlib import Path # Allow running from the repo root without installation. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from minimateplus import event_file_io from sfm.waveform_store import WaveformStore log = logging.getLogger("backfill_thor_events") def _is_thor_event(path: Path) -> bool: if not path.is_file(): return False if path.name.endswith((".sfm.json", ".h5", "_ASCII.TXT")): return False return path.suffix.upper() in (".IDFW", ".IDFH") def _vtuple(s: str) -> tuple: try: return tuple(int(p) for p in str(s).split(".")[:3]) except Exception: return (0, 0, 0) def main(argv=None) -> int: p = argparse.ArgumentParser(description=__doc__) p.add_argument( "--db-path", default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"), help="Used only to derive the default --store-root.", ) p.add_argument("--store-root", default=None) p.add_argument("--dry-run", action="store_true") p.add_argument("--skip-hdf5", action="store_true", help="Don't regenerate .h5 files for IDFW events.") p.add_argument("--force", action="store_true", help="Refresh every Thor event, not just ones with stale or missing bw_report.") p.add_argument("-v", "--verbose", action="store_true") args = p.parse_args(argv) logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", datefmt="%H:%M:%S", ) db_path = Path(args.db_path).expanduser().resolve() store_root = ( Path(args.store_root).expanduser().resolve() if args.store_root else db_path.parent / "waveforms" ) if not store_root.exists(): log.error("store root not found: %s", store_root) return 1 store = WaveformStore(store_root) log.info("store root: %s", store_root) log.info("current TOOL_VERSION: %s", event_file_io.TOOL_VERSION) refreshed = skipped = errors = h5_written = 0 # Lazy imports so any one of these failing produces a useful error # message rather than crashing module-load. from micromate.idf_file import read_idf_file from micromate.idf_to_bw_report import build_bw_report_from_idf for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()): serial = serial_dir.name for path in sorted(serial_dir.iterdir()): if not _is_thor_event(path): continue sidecar_path = store.sidecar_path_for(serial, path.name) if not sidecar_path.exists(): log.debug("%s: no sidecar — skipping (this is a binary without ingest history)", path.name) skipped += 1 continue try: existing = event_file_io.read_sidecar(sidecar_path) except Exception as exc: log.warning("%s: failed to read sidecar — %s", path.name, exc) errors += 1 continue has_bw_report = bool(existing.get("bw_report")) existing_version = (existing.get("source") or {}).get("tool_version", "") up_to_date = ( has_bw_report and _vtuple(existing_version) >= _vtuple(event_file_io.TOOL_VERSION) ) if up_to_date and not args.force: skipped += 1 continue # Re-decode the binary. Catch + log; continue with .txt-only # data if it fails (matches the live ingest path's behavior). idf_samples = None idf_intervals = None binary_md = None is_histogram = path.suffix.upper() == ".IDFH" try: binary_bytes = path.read_bytes() res = read_idf_file(path, data=binary_bytes) idf_samples = res.samples or None idf_intervals = res.intervals binary_md = res.binary_metadata is_histogram = res.intervals is not None except NotImplementedError: # sig-B / Blastware-stray binary; no samples but adapter # can still produce a bw_report from extensions.idf_report. log.debug("%s: binary codec NotImplementedError (sig-B / BW-stray); proceeding from sidecar's idf_report only", path.name) except Exception as exc: log.warning("%s: binary decode failed — %s; proceeding from sidecar's idf_report only", path.name, exc) # Run the adapter. Pull report_dict from # extensions.idf_report (the v0.18.0+ ingest preserved it). report_dict = (existing.get("extensions") or {}).get("idf_report") or {} if not report_dict and binary_md is None: log.debug("%s: no idf_report in sidecar AND no binary metadata — nothing to project", path.name) skipped += 1 continue try: bw_report = build_bw_report_from_idf( report_dict, binary_md=binary_md, intervals=idf_intervals, is_histogram=is_histogram, ) except Exception as exc: log.warning("%s: adapter failed — %s", path.name, exc) errors += 1 continue # Build the new sidecar by overlaying refreshed fields onto # the existing one — preserves review, captured_at, blastware # block, source.kind, etc. new_sidecar = dict(existing) # shallow copy new_sidecar["bw_report"] = bw_report src = dict(new_sidecar.get("source") or {}) src["tool_version"] = event_file_io.TOOL_VERSION new_sidecar["source"] = src # Preserve histogram intervals if the binary decoded them # (improves over the original ingest if that one ran before # the bee1185 codec fix). if idf_intervals is not None: ext = dict(new_sidecar.get("extensions") or {}) ext["idf_intervals"] = [ { "offset": iv.offset, "tran_peak": iv.peak_count("Tran"), "tran_halfp": iv.tran_halfp, "tran_freq": iv.freq_hz("Tran"), "vert_peak": iv.peak_count("Vert"), "vert_halfp": iv.vert_halfp, "vert_freq": iv.freq_hz("Vert"), "long_peak": iv.peak_count("Long"), "long_halfp": iv.long_halfp, "long_freq": iv.freq_hz("Long"), "mic_peak": iv.peak_count("MicL"), "mic_halfp": iv.micl_halfp, "mic_freq": iv.freq_hz("MicL"), } for iv in idf_intervals ] new_sidecar["extensions"] = ext if args.dry_run: will_write_h5 = (idf_samples or idf_intervals) and not args.skip_hdf5 log.info("[DRY] %s/%s — would refresh sidecar (bw_report=%s, h5=%s)", serial, path.name, "wrote" if not has_bw_report else "refreshed", "would write" if will_write_h5 else "skipped") else: event_file_io.write_sidecar(sidecar_path, new_sidecar) log.info("%s/%s — sidecar refreshed (bw_report=%s, intervals=%d)", serial, path.name, "added" if not has_bw_report else "refreshed", len(idf_intervals) if idf_intervals else 0) refreshed += 1 # Regenerate .h5 by replaying the same IdfEvent → Event bridge # save_imported_idf uses. For IDFW we write the decoded per- # sample arrays. For IDFH we synthesise a 1-sample-per-interval # array (peak ADC count per channel per interval) so the # renderer's bar-chart code has something to group on. # Pre-condition: either real samples (IDFW) or decoded intervals # (IDFH). Skip otherwise. have_data = bool(idf_samples) or bool(idf_intervals) if have_data and not args.skip_hdf5: from sfm import event_hdf5 hdf5_path = store.hdf5_path_for(serial, path.name) if args.dry_run: log.debug("[DRY] would write %s", hdf5_path.name) else: try: from micromate import IdfEvent from minimateplus.event_file_io import file_sha256 idf_event = IdfEvent.from_report(report_dict, path.name) # Merge the binary-derived mic peak psi (only the # binary path knows the proper psi value; the .txt # carries dB(L)). Without this, the h5 writer's # per-count mic factor is computed against the # dB(L) value-as-pseudo-psi and the mic chart # scales wildly. if (binary_md is not None and res is not None and res.event.peaks.mic_pspl_psi is not None): idf_event.peaks.mic_pspl_psi = res.event.peaks.mic_pspl_psi sha256 = file_sha256(path) waveform_key = bytes.fromhex(sha256)[:16] ev = idf_event.to_minimateplus_event(waveform_key) if is_histogram and idf_intervals: # 1 sample per interval per channel — same # synthesis save_imported_idf uses. The h5 # writer's count×geo_fs/32768 conversion turns # each peak-ADC-count into the bar's physical # value. ev.raw_samples = { "Tran": [iv.peak_count("Tran") for iv in idf_intervals], "Vert": [iv.peak_count("Vert") for iv in idf_intervals], "Long": [iv.peak_count("Long") for iv in idf_intervals], "MicL": [iv.peak_count("MicL") for iv in idf_intervals], } ev.total_samples = ev.total_samples or len(idf_intervals) elif idf_samples: ev.raw_samples = idf_samples n_samp = max( (len(idf_samples.get(ch, [])) for ch in ("Tran", "Vert", "Long", "MicL")), default=0, ) ev.total_samples = ev.total_samples or n_samp event_hdf5.write_event_hdf5( hdf5_path, ev, serial=serial, geo_range="normal", source_kind="idf-import", tool_version=event_file_io.TOOL_VERSION, ) h5_written += 1 log.debug("%s/%s — .h5 written (%s)", serial, path.name, f"{len(idf_intervals)} intervals" if is_histogram else f"{sum(len(v) for v in (idf_samples or {}).values())} samples") except Exception as exc: log.warning("%s/%s — .h5 write failed: %s", serial, path.name, exc) log.info("Done. refreshed=%d skipped=%d errors=%d h5_written=%d", refreshed, skipped, errors, h5_written) return 0 if errors == 0 else 2 if __name__ == "__main__": sys.exit(main())