From 43f440812a3d92ffa810daa3fab11111b4950c6c Mon Sep 17 00:00:00 2001 From: serversdown Date: Sat, 30 May 2026 04:37:43 +0000 Subject: [PATCH] scripts: add backfill_thor_events.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refreshes the bw_report sidecar block + .h5 waveform files for Thor events ingested before the v0.21.0 adapter wiring + the bee1185 codec fix. Those events landed with extensions.idf_report only (no bw_report, no .h5 for IDFW) — symptom on the UI side: the modal chart 404'd on /waveform.json and the PDF rendered from DB-only fields without sensor self-check, full per-channel breakdown, or mic dB(L). Walks //: - Reads the existing sidecar (preserves review state + captured_at) - Re-runs read_idf_file() on the binary bytes (passes data= kwarg so codec doesn't try the broken bare-path Path.read_bytes) - Reads extensions.idf_report from the existing sidecar - Runs build_bw_report_from_idf adapter - Writes refreshed sidecar with bw_report + bumped tool_version, preserving review block and original captured_at - For IDFW: regenerates .h5 by bridging IdfEvent.from_report -> to_minimateplus_event -> write_event_hdf5 (mirrors save_imported_idf steps 4-7) - IDFH events skip .h5 (histograms have no per-sample data) Skips events already at current TOOL_VERSION with bw_report present. --force overrides. --skip-hdf5 limits to sidecar-only refresh. --dry-run for preview. Validated against the prod-snap waveform store: 3,815 Thor sidecars refreshed cleanly with 0 errors, 462 IDFW .h5 files written, 2 skipped (binaries with no sidecar — backfill doesn't conjure events from nothing). Verified one originally-broken IDFW event now serves waveform.json (200, 168KB) and a fully populated PDF (119KB vs the previous 56KB sparse output). Operator workflow on prod: docker exec python3 /app/scripts/backfill_thor_events.py --dry-run # Inspect counts, then for real: docker exec python3 /app/scripts/backfill_thor_events.py Idempotent — re-running it is a no-op once everything's at the current TOOL_VERSION. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/backfill_thor_events.py | 292 ++++++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 scripts/backfill_thor_events.py diff --git a/scripts/backfill_thor_events.py b/scripts/backfill_thor_events.py new file mode 100644 index 0000000..529218d --- /dev/null +++ b/scripts/backfill_thor_events.py @@ -0,0 +1,292 @@ +""" +scripts/backfill_thor_events.py — re-process existing Thor (Series IV) +events so their sidecars carry the bw_report block produced by +``micromate.idf_to_bw_report.build_bw_report_from_idf`` + their .h5 +clean-waveform files for IDFW events. + +Why this exists +─────────────── + +Thor events ingested before v0.21.0 (or during the v0.21.0 ingest bug +window fixed in commit bee1185) have sidecars with only +``extensions.idf_report`` — no ``bw_report`` block. Without +``bw_report``, the SFM PDF renderer falls back to DB-only fields +(misses sensor-self-check, full per-channel breakdown, mic dB(L)), +and the modal chart 404s on ``/waveform.json`` for IDFW events +because no .h5 was written when the codec failed at ingest. + +Re-forwarding from thor-watcher would also fix this, but that requires +operator coordination on every watcher machine and uses bandwidth this +script doesn't. + +What this does +────────────── + +Walks ``//`` for ``.IDFW`` / ``.IDFH`` files +and, for each one: + + 1. Reads the existing sidecar (preserving review state + captured_at). + 2. Re-runs ``micromate.idf_file.read_idf_file()`` on the binary + bytes — passing ``data=`` so the codec doesn't try to read from + a path it doesn't know. + 3. Pulls ``extensions.idf_report`` (the raw parsed Thor dict the + v0.18.0+ ingest path already stashed) and runs the v0.21.0 + ``build_bw_report_from_idf`` adapter against it. + 4. Writes the refreshed sidecar with the new ``bw_report``, + bumped ``source.tool_version``, but preserved ``review`` block + + the original ``captured_at`` timestamp. + 5. For IDFW events with decoded samples, regenerates the .h5 + waveform file via the existing ``event_hdf5`` writer. + +Idempotent. Re-running it after a parser/adapter change just +re-writes sidecars — no DB writes, no thor-watcher coordination. + +Usage +───── + + python scripts/backfill_thor_events.py [--store-root PATH] + [--dry-run] + [--skip-hdf5] + [--force] + [-v] + +By default, refreshes any Thor event whose sidecar is missing +``bw_report`` OR whose ``source.tool_version`` is older than the +current ``TOOL_VERSION``. ``--force`` refreshes every Thor event +regardless. +""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +# Allow running from the repo root without installation. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from minimateplus import event_file_io +from sfm.waveform_store import WaveformStore + +log = logging.getLogger("backfill_thor_events") + + +def _is_thor_event(path: Path) -> bool: + if not path.is_file(): + return False + if path.name.endswith((".sfm.json", ".h5", "_ASCII.TXT")): + return False + return path.suffix.upper() in (".IDFW", ".IDFH") + + +def _vtuple(s: str) -> tuple: + try: + return tuple(int(p) for p in str(s).split(".")[:3]) + except Exception: + return (0, 0, 0) + + +def main(argv=None) -> int: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument( + "--db-path", + default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"), + help="Used only to derive the default --store-root.", + ) + p.add_argument("--store-root", default=None) + p.add_argument("--dry-run", action="store_true") + p.add_argument("--skip-hdf5", action="store_true", + help="Don't regenerate .h5 files for IDFW events.") + p.add_argument("--force", action="store_true", + help="Refresh every Thor event, not just ones with stale or missing bw_report.") + p.add_argument("-v", "--verbose", action="store_true") + args = p.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-7s %(name)s %(message)s", + datefmt="%H:%M:%S", + ) + + db_path = Path(args.db_path).expanduser().resolve() + store_root = ( + Path(args.store_root).expanduser().resolve() + if args.store_root else db_path.parent / "waveforms" + ) + if not store_root.exists(): + log.error("store root not found: %s", store_root) + return 1 + store = WaveformStore(store_root) + log.info("store root: %s", store_root) + log.info("current TOOL_VERSION: %s", event_file_io.TOOL_VERSION) + + refreshed = skipped = errors = h5_written = 0 + + # Lazy imports so any one of these failing produces a useful error + # message rather than crashing module-load. + from micromate.idf_file import read_idf_file + from micromate.idf_to_bw_report import build_bw_report_from_idf + + for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()): + serial = serial_dir.name + for path in sorted(serial_dir.iterdir()): + if not _is_thor_event(path): + continue + + sidecar_path = store.sidecar_path_for(serial, path.name) + if not sidecar_path.exists(): + log.debug("%s: no sidecar — skipping (this is a binary without ingest history)", + path.name) + skipped += 1 + continue + + try: + existing = event_file_io.read_sidecar(sidecar_path) + except Exception as exc: + log.warning("%s: failed to read sidecar — %s", path.name, exc) + errors += 1 + continue + + has_bw_report = bool(existing.get("bw_report")) + existing_version = (existing.get("source") or {}).get("tool_version", "") + up_to_date = ( + has_bw_report + and _vtuple(existing_version) >= _vtuple(event_file_io.TOOL_VERSION) + ) + if up_to_date and not args.force: + skipped += 1 + continue + + # Re-decode the binary. Catch + log; continue with .txt-only + # data if it fails (matches the live ingest path's behavior). + idf_samples = None + idf_intervals = None + binary_md = None + is_histogram = path.suffix.upper() == ".IDFH" + try: + binary_bytes = path.read_bytes() + res = read_idf_file(path, data=binary_bytes) + idf_samples = res.samples or None + idf_intervals = res.intervals + binary_md = res.binary_metadata + is_histogram = res.intervals is not None + except NotImplementedError: + # sig-B / Blastware-stray binary; no samples but adapter + # can still produce a bw_report from extensions.idf_report. + log.debug("%s: binary codec NotImplementedError (sig-B / BW-stray); proceeding from sidecar's idf_report only", path.name) + except Exception as exc: + log.warning("%s: binary decode failed — %s; proceeding from sidecar's idf_report only", path.name, exc) + + # Run the adapter. Pull report_dict from + # extensions.idf_report (the v0.18.0+ ingest preserved it). + report_dict = (existing.get("extensions") or {}).get("idf_report") or {} + if not report_dict and binary_md is None: + log.debug("%s: no idf_report in sidecar AND no binary metadata — nothing to project", path.name) + skipped += 1 + continue + + try: + bw_report = build_bw_report_from_idf( + report_dict, binary_md=binary_md, + intervals=idf_intervals, is_histogram=is_histogram, + ) + except Exception as exc: + log.warning("%s: adapter failed — %s", path.name, exc) + errors += 1 + continue + + # Build the new sidecar by overlaying refreshed fields onto + # the existing one — preserves review, captured_at, blastware + # block, source.kind, etc. + new_sidecar = dict(existing) # shallow copy + new_sidecar["bw_report"] = bw_report + src = dict(new_sidecar.get("source") or {}) + src["tool_version"] = event_file_io.TOOL_VERSION + new_sidecar["source"] = src + + # Preserve histogram intervals if the binary decoded them + # (improves over the original ingest if that one ran before + # the bee1185 codec fix). + if idf_intervals is not None: + ext = dict(new_sidecar.get("extensions") or {}) + ext["idf_intervals"] = [ + { + "offset": iv.offset, + "tran_peak": iv.peak_count("Tran"), + "tran_halfp": iv.tran_halfp, + "tran_freq": iv.freq_hz("Tran"), + "vert_peak": iv.peak_count("Vert"), + "vert_halfp": iv.vert_halfp, + "vert_freq": iv.freq_hz("Vert"), + "long_peak": iv.peak_count("Long"), + "long_halfp": iv.long_halfp, + "long_freq": iv.freq_hz("Long"), + "mic_peak": iv.peak_count("MicL"), + "mic_halfp": iv.micl_halfp, + "mic_freq": iv.freq_hz("MicL"), + } + for iv in idf_intervals + ] + new_sidecar["extensions"] = ext + + if args.dry_run: + log.info("[DRY] %s/%s — would refresh sidecar (bw_report=%s, h5=%s)", + serial, path.name, + "wrote" if not has_bw_report else "refreshed", + "would write" if (idf_samples and not args.skip_hdf5) else "skipped") + else: + event_file_io.write_sidecar(sidecar_path, new_sidecar) + log.info("%s/%s — sidecar refreshed (bw_report=%s, intervals=%d)", + serial, path.name, + "added" if not has_bw_report else "refreshed", + len(idf_intervals) if idf_intervals else 0) + refreshed += 1 + + # Regenerate .h5 for IDFW events with decoded samples by + # replaying the same IdfEvent → Event bridge save_imported_idf + # uses. IDFH events have no per-sample data; skip. + if idf_samples and not args.skip_hdf5 and not is_histogram: + from sfm import event_hdf5 + hdf5_path = store.hdf5_path_for(serial, path.name) + if args.dry_run: + log.debug("[DRY] would write %s", hdf5_path.name) + else: + try: + from micromate import IdfEvent + from minimateplus.event_file_io import file_sha256 + # Bridge: parsed idf_report dict → IdfEvent → + # minimateplus Event → write_event_hdf5. Mirrors + # save_imported_idf steps 4-7. + idf_event = IdfEvent.from_report(report_dict, path.name) + sha256 = file_sha256(path) + waveform_key = bytes.fromhex(sha256)[:16] + ev = idf_event.to_minimateplus_event(waveform_key) + ev.raw_samples = idf_samples + n_samp = max( + (len(idf_samples.get(ch, [])) + for ch in ("Tran", "Vert", "Long", "MicL")), + default=0, + ) + ev.total_samples = ev.total_samples or n_samp + event_hdf5.write_event_hdf5( + hdf5_path, ev, + serial=serial, + geo_range="normal", + source_kind="idf-import", + tool_version=event_file_io.TOOL_VERSION, + ) + h5_written += 1 + log.debug("%s/%s — .h5 written (%d samples)", + serial, path.name, n_samp) + except Exception as exc: + log.warning("%s/%s — .h5 write failed: %s", + serial, path.name, exc) + + log.info("Done. refreshed=%d skipped=%d errors=%d h5_written=%d", + refreshed, skipped, errors, h5_written) + return 0 if errors == 0 else 2 + + +if __name__ == "__main__": + sys.exit(main())