Files
seismo-relay/scripts/backfill_thor_events.py
serversdown 25386cab8b fix(backfill): regenerate IDFH .h5 + merge binary mic_pspl_psi onto bridge
Two gaps in backfill_thor_events.py that left old Thor events showing
stale charts after a v0.21.1 backfill pass:
1. IDFH events were skipped from .h5 regeneration (the "have decoded
   samples" gate was IDFW-only).  Histograms kept their pre-v0.21.1
   .h5 — written from raw_samples = None, which the renderer turned
   into a near-empty bar chart, or for older events the dB(L)-as-pseudo-
   psi mic scale that produced "107.7 psi" peaks (atomic-bomb level
   instead of footstep level).  Fix: synthesise the same 1-sample-per-
   interval array save_imported_idf v0.21.1 uses (peak ADC count per
   channel per interval) so the renderer's bar-chart grouping has
   data to work with.
2. The IDFW h5 path didn't merge binary_peaks.mic_pspl_psi onto the
   IdfEvent before to_minimateplus_event().  The live save_imported_idf
   does this merge — without it, IdfEvent.from_report() only sees the
   .txt's dB(L) value, the bridge falls back to the dBL→psi formula
   (instead of the binary-accurate 2.14e-6 psi/count value), and the
   h5 writer's per-count mic factor lands on a less-correct value.
   Fix: same merge the live ingest does (lift res.event.peaks.mic_pspl_psi
   onto idf_event.peaks before the bridge call).
Verified against UM6047_20250804190047.IDFH (250-interval prod
histogram): 250 intervals decode, mic_pspl_psi = 2.78e-5 (was being
treated as dB(L)=107.7 in the old h5).
Operator: re-run after deploy.  `docker compose exec sfm python
scripts/backfill_thor_events.py` is idempotent — the existing version
check still skips events already at the new TOOL_VERSION, and review
state + captured_at are preserved on the second pass.
2026-06-01 20:02:54 +00:00

332 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
scripts/backfill_thor_events.py — re-process existing Thor (Series IV)
events so their sidecars carry the bw_report block produced by
``micromate.idf_to_bw_report.build_bw_report_from_idf`` + their .h5
clean-waveform files for IDFW events.
Why this exists
───────────────
Thor events ingested before v0.21.0 (or during the v0.21.0 ingest bug
window fixed in commit bee1185) have sidecars with only
``extensions.idf_report`` — no ``bw_report`` block. Without
``bw_report``, the SFM PDF renderer falls back to DB-only fields
(misses sensor-self-check, full per-channel breakdown, mic dB(L)),
and the modal chart 404s on ``/waveform.json`` for IDFW events
because no .h5 was written when the codec failed at ingest.
Re-forwarding from thor-watcher would also fix this, but that requires
operator coordination on every watcher machine and uses bandwidth this
script doesn't.
What this does
──────────────
Walks ``<store>/<serial>/<filename>`` for ``.IDFW`` / ``.IDFH`` files
and, for each one:
1. Reads the existing sidecar (preserving review state + captured_at).
2. Re-runs ``micromate.idf_file.read_idf_file()`` on the binary
bytes — passing ``data=`` so the codec doesn't try to read from
a path it doesn't know.
3. Pulls ``extensions.idf_report`` (the raw parsed Thor dict the
v0.18.0+ ingest path already stashed) and runs the v0.21.0
``build_bw_report_from_idf`` adapter against it.
4. Writes the refreshed sidecar with the new ``bw_report``,
bumped ``source.tool_version``, but preserved ``review`` block
+ the original ``captured_at`` timestamp.
5. Regenerates the .h5 waveform file via the existing
``event_hdf5`` writer. For IDFW that's the decoded per-sample
stream; for IDFH it's a 1-sample-per-interval synthesised array
(peak ADC count per channel) so the renderer's bar-chart code
has data to group on. Mic peak psi from the binary is merged
onto the IdfEvent before the bridge so the h5 writer's per-count
mic scale factor lands on a sensible value (without this the
mic chart on Thor events plots dB(L)-as-pseudo-psi and shows
bomb-level numbers).
Idempotent. Re-running it after a parser/adapter change just
re-writes sidecars — no DB writes, no thor-watcher coordination.
Usage
─────
python scripts/backfill_thor_events.py [--store-root PATH]
[--dry-run]
[--skip-hdf5]
[--force]
[-v]
By default, refreshes any Thor event whose sidecar is missing
``bw_report`` OR whose ``source.tool_version`` is older than the
current ``TOOL_VERSION``. ``--force`` refreshes every Thor event
regardless.
"""
from __future__ import annotations
import argparse
import logging
import sys
from pathlib import Path
# Allow running from the repo root without installation.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from minimateplus import event_file_io
from sfm.waveform_store import WaveformStore
log = logging.getLogger("backfill_thor_events")
def _is_thor_event(path: Path) -> bool:
if not path.is_file():
return False
if path.name.endswith((".sfm.json", ".h5", "_ASCII.TXT")):
return False
return path.suffix.upper() in (".IDFW", ".IDFH")
def _vtuple(s: str) -> tuple:
try:
return tuple(int(p) for p in str(s).split(".")[:3])
except Exception:
return (0, 0, 0)
def main(argv=None) -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--db-path",
default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"),
help="Used only to derive the default --store-root.",
)
p.add_argument("--store-root", default=None)
p.add_argument("--dry-run", action="store_true")
p.add_argument("--skip-hdf5", action="store_true",
help="Don't regenerate .h5 files for IDFW events.")
p.add_argument("--force", action="store_true",
help="Refresh every Thor event, not just ones with stale or missing bw_report.")
p.add_argument("-v", "--verbose", action="store_true")
args = p.parse_args(argv)
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
db_path = Path(args.db_path).expanduser().resolve()
store_root = (
Path(args.store_root).expanduser().resolve()
if args.store_root else db_path.parent / "waveforms"
)
if not store_root.exists():
log.error("store root not found: %s", store_root)
return 1
store = WaveformStore(store_root)
log.info("store root: %s", store_root)
log.info("current TOOL_VERSION: %s", event_file_io.TOOL_VERSION)
refreshed = skipped = errors = h5_written = 0
# Lazy imports so any one of these failing produces a useful error
# message rather than crashing module-load.
from micromate.idf_file import read_idf_file
from micromate.idf_to_bw_report import build_bw_report_from_idf
for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()):
serial = serial_dir.name
for path in sorted(serial_dir.iterdir()):
if not _is_thor_event(path):
continue
sidecar_path = store.sidecar_path_for(serial, path.name)
if not sidecar_path.exists():
log.debug("%s: no sidecar — skipping (this is a binary without ingest history)",
path.name)
skipped += 1
continue
try:
existing = event_file_io.read_sidecar(sidecar_path)
except Exception as exc:
log.warning("%s: failed to read sidecar — %s", path.name, exc)
errors += 1
continue
has_bw_report = bool(existing.get("bw_report"))
existing_version = (existing.get("source") or {}).get("tool_version", "")
up_to_date = (
has_bw_report
and _vtuple(existing_version) >= _vtuple(event_file_io.TOOL_VERSION)
)
if up_to_date and not args.force:
skipped += 1
continue
# Re-decode the binary. Catch + log; continue with .txt-only
# data if it fails (matches the live ingest path's behavior).
idf_samples = None
idf_intervals = None
binary_md = None
is_histogram = path.suffix.upper() == ".IDFH"
try:
binary_bytes = path.read_bytes()
res = read_idf_file(path, data=binary_bytes)
idf_samples = res.samples or None
idf_intervals = res.intervals
binary_md = res.binary_metadata
is_histogram = res.intervals is not None
except NotImplementedError:
# sig-B / Blastware-stray binary; no samples but adapter
# can still produce a bw_report from extensions.idf_report.
log.debug("%s: binary codec NotImplementedError (sig-B / BW-stray); proceeding from sidecar's idf_report only", path.name)
except Exception as exc:
log.warning("%s: binary decode failed — %s; proceeding from sidecar's idf_report only", path.name, exc)
# Run the adapter. Pull report_dict from
# extensions.idf_report (the v0.18.0+ ingest preserved it).
report_dict = (existing.get("extensions") or {}).get("idf_report") or {}
if not report_dict and binary_md is None:
log.debug("%s: no idf_report in sidecar AND no binary metadata — nothing to project", path.name)
skipped += 1
continue
try:
bw_report = build_bw_report_from_idf(
report_dict, binary_md=binary_md,
intervals=idf_intervals, is_histogram=is_histogram,
)
except Exception as exc:
log.warning("%s: adapter failed — %s", path.name, exc)
errors += 1
continue
# Build the new sidecar by overlaying refreshed fields onto
# the existing one — preserves review, captured_at, blastware
# block, source.kind, etc.
new_sidecar = dict(existing) # shallow copy
new_sidecar["bw_report"] = bw_report
src = dict(new_sidecar.get("source") or {})
src["tool_version"] = event_file_io.TOOL_VERSION
new_sidecar["source"] = src
# Preserve histogram intervals if the binary decoded them
# (improves over the original ingest if that one ran before
# the bee1185 codec fix).
if idf_intervals is not None:
ext = dict(new_sidecar.get("extensions") or {})
ext["idf_intervals"] = [
{
"offset": iv.offset,
"tran_peak": iv.peak_count("Tran"),
"tran_halfp": iv.tran_halfp,
"tran_freq": iv.freq_hz("Tran"),
"vert_peak": iv.peak_count("Vert"),
"vert_halfp": iv.vert_halfp,
"vert_freq": iv.freq_hz("Vert"),
"long_peak": iv.peak_count("Long"),
"long_halfp": iv.long_halfp,
"long_freq": iv.freq_hz("Long"),
"mic_peak": iv.peak_count("MicL"),
"mic_halfp": iv.micl_halfp,
"mic_freq": iv.freq_hz("MicL"),
}
for iv in idf_intervals
]
new_sidecar["extensions"] = ext
if args.dry_run:
will_write_h5 = (idf_samples or idf_intervals) and not args.skip_hdf5
log.info("[DRY] %s/%s — would refresh sidecar (bw_report=%s, h5=%s)",
serial, path.name,
"wrote" if not has_bw_report else "refreshed",
"would write" if will_write_h5 else "skipped")
else:
event_file_io.write_sidecar(sidecar_path, new_sidecar)
log.info("%s/%s — sidecar refreshed (bw_report=%s, intervals=%d)",
serial, path.name,
"added" if not has_bw_report else "refreshed",
len(idf_intervals) if idf_intervals else 0)
refreshed += 1
# Regenerate .h5 by replaying the same IdfEvent → Event bridge
# save_imported_idf uses. For IDFW we write the decoded per-
# sample arrays. For IDFH we synthesise a 1-sample-per-interval
# array (peak ADC count per channel per interval) so the
# renderer's bar-chart code has something to group on.
# Pre-condition: either real samples (IDFW) or decoded intervals
# (IDFH). Skip otherwise.
have_data = bool(idf_samples) or bool(idf_intervals)
if have_data and not args.skip_hdf5:
from sfm import event_hdf5
hdf5_path = store.hdf5_path_for(serial, path.name)
if args.dry_run:
log.debug("[DRY] would write %s", hdf5_path.name)
else:
try:
from micromate import IdfEvent
from minimateplus.event_file_io import file_sha256
idf_event = IdfEvent.from_report(report_dict, path.name)
# Merge the binary-derived mic peak psi (only the
# binary path knows the proper psi value; the .txt
# carries dB(L)). Without this, the h5 writer's
# per-count mic factor is computed against the
# dB(L) value-as-pseudo-psi and the mic chart
# scales wildly.
if (binary_md is not None and res is not None
and res.event.peaks.mic_pspl_psi is not None):
idf_event.peaks.mic_pspl_psi = res.event.peaks.mic_pspl_psi
sha256 = file_sha256(path)
waveform_key = bytes.fromhex(sha256)[:16]
ev = idf_event.to_minimateplus_event(waveform_key)
if is_histogram and idf_intervals:
# 1 sample per interval per channel — same
# synthesis save_imported_idf uses. The h5
# writer's count×geo_fs/32768 conversion turns
# each peak-ADC-count into the bar's physical
# value.
ev.raw_samples = {
"Tran": [iv.peak_count("Tran") for iv in idf_intervals],
"Vert": [iv.peak_count("Vert") for iv in idf_intervals],
"Long": [iv.peak_count("Long") for iv in idf_intervals],
"MicL": [iv.peak_count("MicL") for iv in idf_intervals],
}
ev.total_samples = ev.total_samples or len(idf_intervals)
elif idf_samples:
ev.raw_samples = idf_samples
n_samp = max(
(len(idf_samples.get(ch, []))
for ch in ("Tran", "Vert", "Long", "MicL")),
default=0,
)
ev.total_samples = ev.total_samples or n_samp
event_hdf5.write_event_hdf5(
hdf5_path, ev,
serial=serial,
geo_range="normal",
source_kind="idf-import",
tool_version=event_file_io.TOOL_VERSION,
)
h5_written += 1
log.debug("%s/%s — .h5 written (%s)",
serial, path.name,
f"{len(idf_intervals)} intervals" if is_histogram
else f"{sum(len(v) for v in (idf_samples or {}).values())} samples")
except Exception as exc:
log.warning("%s/%s — .h5 write failed: %s",
serial, path.name, exc)
log.info("Done. refreshed=%d skipped=%d errors=%d h5_written=%d",
refreshed, skipped, errors, h5_written)
return 0 if errors == 0 else 2
if __name__ == "__main__":
sys.exit(main())