43f440812a
Refreshes the bw_report sidecar block + .h5 waveform files for Thor
events ingested before the v0.21.0 adapter wiring + the bee1185 codec
fix. Those events landed with extensions.idf_report only (no
bw_report, no .h5 for IDFW) — symptom on the UI side: the modal chart
404'd on /waveform.json and the PDF rendered from DB-only fields
without sensor self-check, full per-channel breakdown, or mic dB(L).
Walks <store>/<serial>/<filename>:
- Reads the existing sidecar (preserves review state + captured_at)
- Re-runs read_idf_file() on the binary bytes (passes data=
kwarg so codec doesn't try the broken bare-path Path.read_bytes)
- Reads extensions.idf_report from the existing sidecar
- Runs build_bw_report_from_idf adapter
- Writes refreshed sidecar with bw_report + bumped tool_version,
preserving review block and original captured_at
- For IDFW: regenerates .h5 by bridging IdfEvent.from_report ->
to_minimateplus_event -> write_event_hdf5 (mirrors save_imported_idf
steps 4-7)
- IDFH events skip .h5 (histograms have no per-sample data)
Skips events already at current TOOL_VERSION with bw_report present.
--force overrides. --skip-hdf5 limits to sidecar-only refresh.
--dry-run for preview.
Validated against the prod-snap waveform store: 3,815 Thor sidecars
refreshed cleanly with 0 errors, 462 IDFW .h5 files written, 2 skipped
(binaries with no sidecar — backfill doesn't conjure events from
nothing). Verified one originally-broken IDFW event now serves
waveform.json (200, 168KB) and a fully populated PDF (119KB vs the
previous 56KB sparse output).
Operator workflow on prod:
docker exec <sfm-container> python3 /app/scripts/backfill_thor_events.py --dry-run
# Inspect counts, then for real:
docker exec <sfm-container> python3 /app/scripts/backfill_thor_events.py
Idempotent — re-running it is a no-op once everything's at the current
TOOL_VERSION.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
293 lines
12 KiB
Python
293 lines
12 KiB
Python
"""
|
|
scripts/backfill_thor_events.py — re-process existing Thor (Series IV)
|
|
events so their sidecars carry the bw_report block produced by
|
|
``micromate.idf_to_bw_report.build_bw_report_from_idf`` + their .h5
|
|
clean-waveform files for IDFW events.
|
|
|
|
Why this exists
|
|
───────────────
|
|
|
|
Thor events ingested before v0.21.0 (or during the v0.21.0 ingest bug
|
|
window fixed in commit bee1185) have sidecars with only
|
|
``extensions.idf_report`` — no ``bw_report`` block. Without
|
|
``bw_report``, the SFM PDF renderer falls back to DB-only fields
|
|
(misses sensor-self-check, full per-channel breakdown, mic dB(L)),
|
|
and the modal chart 404s on ``/waveform.json`` for IDFW events
|
|
because no .h5 was written when the codec failed at ingest.
|
|
|
|
Re-forwarding from thor-watcher would also fix this, but that requires
|
|
operator coordination on every watcher machine and uses bandwidth this
|
|
script doesn't.
|
|
|
|
What this does
|
|
──────────────
|
|
|
|
Walks ``<store>/<serial>/<filename>`` for ``.IDFW`` / ``.IDFH`` files
|
|
and, for each one:
|
|
|
|
1. Reads the existing sidecar (preserving review state + captured_at).
|
|
2. Re-runs ``micromate.idf_file.read_idf_file()`` on the binary
|
|
bytes — passing ``data=`` so the codec doesn't try to read from
|
|
a path it doesn't know.
|
|
3. Pulls ``extensions.idf_report`` (the raw parsed Thor dict the
|
|
v0.18.0+ ingest path already stashed) and runs the v0.21.0
|
|
``build_bw_report_from_idf`` adapter against it.
|
|
4. Writes the refreshed sidecar with the new ``bw_report``,
|
|
bumped ``source.tool_version``, but preserved ``review`` block
|
|
+ the original ``captured_at`` timestamp.
|
|
5. For IDFW events with decoded samples, regenerates the .h5
|
|
waveform file via the existing ``event_hdf5`` writer.
|
|
|
|
Idempotent. Re-running it after a parser/adapter change just
|
|
re-writes sidecars — no DB writes, no thor-watcher coordination.
|
|
|
|
Usage
|
|
─────
|
|
|
|
python scripts/backfill_thor_events.py [--store-root PATH]
|
|
[--dry-run]
|
|
[--skip-hdf5]
|
|
[--force]
|
|
[-v]
|
|
|
|
By default, refreshes any Thor event whose sidecar is missing
|
|
``bw_report`` OR whose ``source.tool_version`` is older than the
|
|
current ``TOOL_VERSION``. ``--force`` refreshes every Thor event
|
|
regardless.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Allow running from the repo root without installation.
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from minimateplus import event_file_io
|
|
from sfm.waveform_store import WaveformStore
|
|
|
|
log = logging.getLogger("backfill_thor_events")
|
|
|
|
|
|
def _is_thor_event(path: Path) -> bool:
|
|
if not path.is_file():
|
|
return False
|
|
if path.name.endswith((".sfm.json", ".h5", "_ASCII.TXT")):
|
|
return False
|
|
return path.suffix.upper() in (".IDFW", ".IDFH")
|
|
|
|
|
|
def _vtuple(s: str) -> tuple:
|
|
try:
|
|
return tuple(int(p) for p in str(s).split(".")[:3])
|
|
except Exception:
|
|
return (0, 0, 0)
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
p = argparse.ArgumentParser(description=__doc__)
|
|
p.add_argument(
|
|
"--db-path",
|
|
default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"),
|
|
help="Used only to derive the default --store-root.",
|
|
)
|
|
p.add_argument("--store-root", default=None)
|
|
p.add_argument("--dry-run", action="store_true")
|
|
p.add_argument("--skip-hdf5", action="store_true",
|
|
help="Don't regenerate .h5 files for IDFW events.")
|
|
p.add_argument("--force", action="store_true",
|
|
help="Refresh every Thor event, not just ones with stale or missing bw_report.")
|
|
p.add_argument("-v", "--verbose", action="store_true")
|
|
args = p.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
db_path = Path(args.db_path).expanduser().resolve()
|
|
store_root = (
|
|
Path(args.store_root).expanduser().resolve()
|
|
if args.store_root else db_path.parent / "waveforms"
|
|
)
|
|
if not store_root.exists():
|
|
log.error("store root not found: %s", store_root)
|
|
return 1
|
|
store = WaveformStore(store_root)
|
|
log.info("store root: %s", store_root)
|
|
log.info("current TOOL_VERSION: %s", event_file_io.TOOL_VERSION)
|
|
|
|
refreshed = skipped = errors = h5_written = 0
|
|
|
|
# Lazy imports so any one of these failing produces a useful error
|
|
# message rather than crashing module-load.
|
|
from micromate.idf_file import read_idf_file
|
|
from micromate.idf_to_bw_report import build_bw_report_from_idf
|
|
|
|
for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()):
|
|
serial = serial_dir.name
|
|
for path in sorted(serial_dir.iterdir()):
|
|
if not _is_thor_event(path):
|
|
continue
|
|
|
|
sidecar_path = store.sidecar_path_for(serial, path.name)
|
|
if not sidecar_path.exists():
|
|
log.debug("%s: no sidecar — skipping (this is a binary without ingest history)",
|
|
path.name)
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
existing = event_file_io.read_sidecar(sidecar_path)
|
|
except Exception as exc:
|
|
log.warning("%s: failed to read sidecar — %s", path.name, exc)
|
|
errors += 1
|
|
continue
|
|
|
|
has_bw_report = bool(existing.get("bw_report"))
|
|
existing_version = (existing.get("source") or {}).get("tool_version", "")
|
|
up_to_date = (
|
|
has_bw_report
|
|
and _vtuple(existing_version) >= _vtuple(event_file_io.TOOL_VERSION)
|
|
)
|
|
if up_to_date and not args.force:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Re-decode the binary. Catch + log; continue with .txt-only
|
|
# data if it fails (matches the live ingest path's behavior).
|
|
idf_samples = None
|
|
idf_intervals = None
|
|
binary_md = None
|
|
is_histogram = path.suffix.upper() == ".IDFH"
|
|
try:
|
|
binary_bytes = path.read_bytes()
|
|
res = read_idf_file(path, data=binary_bytes)
|
|
idf_samples = res.samples or None
|
|
idf_intervals = res.intervals
|
|
binary_md = res.binary_metadata
|
|
is_histogram = res.intervals is not None
|
|
except NotImplementedError:
|
|
# sig-B / Blastware-stray binary; no samples but adapter
|
|
# can still produce a bw_report from extensions.idf_report.
|
|
log.debug("%s: binary codec NotImplementedError (sig-B / BW-stray); proceeding from sidecar's idf_report only", path.name)
|
|
except Exception as exc:
|
|
log.warning("%s: binary decode failed — %s; proceeding from sidecar's idf_report only", path.name, exc)
|
|
|
|
# Run the adapter. Pull report_dict from
|
|
# extensions.idf_report (the v0.18.0+ ingest preserved it).
|
|
report_dict = (existing.get("extensions") or {}).get("idf_report") or {}
|
|
if not report_dict and binary_md is None:
|
|
log.debug("%s: no idf_report in sidecar AND no binary metadata — nothing to project", path.name)
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
bw_report = build_bw_report_from_idf(
|
|
report_dict, binary_md=binary_md,
|
|
intervals=idf_intervals, is_histogram=is_histogram,
|
|
)
|
|
except Exception as exc:
|
|
log.warning("%s: adapter failed — %s", path.name, exc)
|
|
errors += 1
|
|
continue
|
|
|
|
# Build the new sidecar by overlaying refreshed fields onto
|
|
# the existing one — preserves review, captured_at, blastware
|
|
# block, source.kind, etc.
|
|
new_sidecar = dict(existing) # shallow copy
|
|
new_sidecar["bw_report"] = bw_report
|
|
src = dict(new_sidecar.get("source") or {})
|
|
src["tool_version"] = event_file_io.TOOL_VERSION
|
|
new_sidecar["source"] = src
|
|
|
|
# Preserve histogram intervals if the binary decoded them
|
|
# (improves over the original ingest if that one ran before
|
|
# the bee1185 codec fix).
|
|
if idf_intervals is not None:
|
|
ext = dict(new_sidecar.get("extensions") or {})
|
|
ext["idf_intervals"] = [
|
|
{
|
|
"offset": iv.offset,
|
|
"tran_peak": iv.peak_count("Tran"),
|
|
"tran_halfp": iv.tran_halfp,
|
|
"tran_freq": iv.freq_hz("Tran"),
|
|
"vert_peak": iv.peak_count("Vert"),
|
|
"vert_halfp": iv.vert_halfp,
|
|
"vert_freq": iv.freq_hz("Vert"),
|
|
"long_peak": iv.peak_count("Long"),
|
|
"long_halfp": iv.long_halfp,
|
|
"long_freq": iv.freq_hz("Long"),
|
|
"mic_peak": iv.peak_count("MicL"),
|
|
"mic_halfp": iv.micl_halfp,
|
|
"mic_freq": iv.freq_hz("MicL"),
|
|
}
|
|
for iv in idf_intervals
|
|
]
|
|
new_sidecar["extensions"] = ext
|
|
|
|
if args.dry_run:
|
|
log.info("[DRY] %s/%s — would refresh sidecar (bw_report=%s, h5=%s)",
|
|
serial, path.name,
|
|
"wrote" if not has_bw_report else "refreshed",
|
|
"would write" if (idf_samples and not args.skip_hdf5) else "skipped")
|
|
else:
|
|
event_file_io.write_sidecar(sidecar_path, new_sidecar)
|
|
log.info("%s/%s — sidecar refreshed (bw_report=%s, intervals=%d)",
|
|
serial, path.name,
|
|
"added" if not has_bw_report else "refreshed",
|
|
len(idf_intervals) if idf_intervals else 0)
|
|
refreshed += 1
|
|
|
|
# Regenerate .h5 for IDFW events with decoded samples by
|
|
# replaying the same IdfEvent → Event bridge save_imported_idf
|
|
# uses. IDFH events have no per-sample data; skip.
|
|
if idf_samples and not args.skip_hdf5 and not is_histogram:
|
|
from sfm import event_hdf5
|
|
hdf5_path = store.hdf5_path_for(serial, path.name)
|
|
if args.dry_run:
|
|
log.debug("[DRY] would write %s", hdf5_path.name)
|
|
else:
|
|
try:
|
|
from micromate import IdfEvent
|
|
from minimateplus.event_file_io import file_sha256
|
|
# Bridge: parsed idf_report dict → IdfEvent →
|
|
# minimateplus Event → write_event_hdf5. Mirrors
|
|
# save_imported_idf steps 4-7.
|
|
idf_event = IdfEvent.from_report(report_dict, path.name)
|
|
sha256 = file_sha256(path)
|
|
waveform_key = bytes.fromhex(sha256)[:16]
|
|
ev = idf_event.to_minimateplus_event(waveform_key)
|
|
ev.raw_samples = idf_samples
|
|
n_samp = max(
|
|
(len(idf_samples.get(ch, []))
|
|
for ch in ("Tran", "Vert", "Long", "MicL")),
|
|
default=0,
|
|
)
|
|
ev.total_samples = ev.total_samples or n_samp
|
|
event_hdf5.write_event_hdf5(
|
|
hdf5_path, ev,
|
|
serial=serial,
|
|
geo_range="normal",
|
|
source_kind="idf-import",
|
|
tool_version=event_file_io.TOOL_VERSION,
|
|
)
|
|
h5_written += 1
|
|
log.debug("%s/%s — .h5 written (%d samples)",
|
|
serial, path.name, n_samp)
|
|
except Exception as exc:
|
|
log.warning("%s/%s — .h5 write failed: %s",
|
|
serial, path.name, exc)
|
|
|
|
log.info("Done. refreshed=%d skipped=%d errors=%d h5_written=%d",
|
|
refreshed, skipped, errors, h5_written)
|
|
return 0 if errors == 0 else 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|