update to v0.21.1, thor data import successful #29
@@ -0,0 +1,292 @@
|
||||
"""
|
||||
scripts/backfill_thor_events.py — re-process existing Thor (Series IV)
|
||||
events so their sidecars carry the bw_report block produced by
|
||||
``micromate.idf_to_bw_report.build_bw_report_from_idf`` + their .h5
|
||||
clean-waveform files for IDFW events.
|
||||
|
||||
Why this exists
|
||||
───────────────
|
||||
|
||||
Thor events ingested before v0.21.0 (or during the v0.21.0 ingest bug
|
||||
window fixed in commit bee1185) have sidecars with only
|
||||
``extensions.idf_report`` — no ``bw_report`` block. Without
|
||||
``bw_report``, the SFM PDF renderer falls back to DB-only fields
|
||||
(misses sensor-self-check, full per-channel breakdown, mic dB(L)),
|
||||
and the modal chart 404s on ``/waveform.json`` for IDFW events
|
||||
because no .h5 was written when the codec failed at ingest.
|
||||
|
||||
Re-forwarding from thor-watcher would also fix this, but that requires
|
||||
operator coordination on every watcher machine and uses bandwidth this
|
||||
script doesn't.
|
||||
|
||||
What this does
|
||||
──────────────
|
||||
|
||||
Walks ``<store>/<serial>/<filename>`` for ``.IDFW`` / ``.IDFH`` files
|
||||
and, for each one:
|
||||
|
||||
1. Reads the existing sidecar (preserving review state + captured_at).
|
||||
2. Re-runs ``micromate.idf_file.read_idf_file()`` on the binary
|
||||
bytes — passing ``data=`` so the codec doesn't try to read from
|
||||
a path it doesn't know.
|
||||
3. Pulls ``extensions.idf_report`` (the raw parsed Thor dict the
|
||||
v0.18.0+ ingest path already stashed) and runs the v0.21.0
|
||||
``build_bw_report_from_idf`` adapter against it.
|
||||
4. Writes the refreshed sidecar with the new ``bw_report``,
|
||||
bumped ``source.tool_version``, but preserved ``review`` block
|
||||
+ the original ``captured_at`` timestamp.
|
||||
5. For IDFW events with decoded samples, regenerates the .h5
|
||||
waveform file via the existing ``event_hdf5`` writer.
|
||||
|
||||
Idempotent. Re-running it after a parser/adapter change just
|
||||
re-writes sidecars — no DB writes, no thor-watcher coordination.
|
||||
|
||||
Usage
|
||||
─────
|
||||
|
||||
python scripts/backfill_thor_events.py [--store-root PATH]
|
||||
[--dry-run]
|
||||
[--skip-hdf5]
|
||||
[--force]
|
||||
[-v]
|
||||
|
||||
By default, refreshes any Thor event whose sidecar is missing
|
||||
``bw_report`` OR whose ``source.tool_version`` is older than the
|
||||
current ``TOOL_VERSION``. ``--force`` refreshes every Thor event
|
||||
regardless.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Allow running from the repo root without installation.
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from minimateplus import event_file_io
|
||||
from sfm.waveform_store import WaveformStore
|
||||
|
||||
log = logging.getLogger("backfill_thor_events")
|
||||
|
||||
|
||||
def _is_thor_event(path: Path) -> bool:
|
||||
if not path.is_file():
|
||||
return False
|
||||
if path.name.endswith((".sfm.json", ".h5", "_ASCII.TXT")):
|
||||
return False
|
||||
return path.suffix.upper() in (".IDFW", ".IDFH")
|
||||
|
||||
|
||||
def _vtuple(s: str) -> tuple:
|
||||
try:
|
||||
return tuple(int(p) for p in str(s).split(".")[:3])
|
||||
except Exception:
|
||||
return (0, 0, 0)
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument(
|
||||
"--db-path",
|
||||
default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"),
|
||||
help="Used only to derive the default --store-root.",
|
||||
)
|
||||
p.add_argument("--store-root", default=None)
|
||||
p.add_argument("--dry-run", action="store_true")
|
||||
p.add_argument("--skip-hdf5", action="store_true",
|
||||
help="Don't regenerate .h5 files for IDFW events.")
|
||||
p.add_argument("--force", action="store_true",
|
||||
help="Refresh every Thor event, not just ones with stale or missing bw_report.")
|
||||
p.add_argument("-v", "--verbose", action="store_true")
|
||||
args = p.parse_args(argv)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
|
||||
db_path = Path(args.db_path).expanduser().resolve()
|
||||
store_root = (
|
||||
Path(args.store_root).expanduser().resolve()
|
||||
if args.store_root else db_path.parent / "waveforms"
|
||||
)
|
||||
if not store_root.exists():
|
||||
log.error("store root not found: %s", store_root)
|
||||
return 1
|
||||
store = WaveformStore(store_root)
|
||||
log.info("store root: %s", store_root)
|
||||
log.info("current TOOL_VERSION: %s", event_file_io.TOOL_VERSION)
|
||||
|
||||
refreshed = skipped = errors = h5_written = 0
|
||||
|
||||
# Lazy imports so any one of these failing produces a useful error
|
||||
# message rather than crashing module-load.
|
||||
from micromate.idf_file import read_idf_file
|
||||
from micromate.idf_to_bw_report import build_bw_report_from_idf
|
||||
|
||||
for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()):
|
||||
serial = serial_dir.name
|
||||
for path in sorted(serial_dir.iterdir()):
|
||||
if not _is_thor_event(path):
|
||||
continue
|
||||
|
||||
sidecar_path = store.sidecar_path_for(serial, path.name)
|
||||
if not sidecar_path.exists():
|
||||
log.debug("%s: no sidecar — skipping (this is a binary without ingest history)",
|
||||
path.name)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
existing = event_file_io.read_sidecar(sidecar_path)
|
||||
except Exception as exc:
|
||||
log.warning("%s: failed to read sidecar — %s", path.name, exc)
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
has_bw_report = bool(existing.get("bw_report"))
|
||||
existing_version = (existing.get("source") or {}).get("tool_version", "")
|
||||
up_to_date = (
|
||||
has_bw_report
|
||||
and _vtuple(existing_version) >= _vtuple(event_file_io.TOOL_VERSION)
|
||||
)
|
||||
if up_to_date and not args.force:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Re-decode the binary. Catch + log; continue with .txt-only
|
||||
# data if it fails (matches the live ingest path's behavior).
|
||||
idf_samples = None
|
||||
idf_intervals = None
|
||||
binary_md = None
|
||||
is_histogram = path.suffix.upper() == ".IDFH"
|
||||
try:
|
||||
binary_bytes = path.read_bytes()
|
||||
res = read_idf_file(path, data=binary_bytes)
|
||||
idf_samples = res.samples or None
|
||||
idf_intervals = res.intervals
|
||||
binary_md = res.binary_metadata
|
||||
is_histogram = res.intervals is not None
|
||||
except NotImplementedError:
|
||||
# sig-B / Blastware-stray binary; no samples but adapter
|
||||
# can still produce a bw_report from extensions.idf_report.
|
||||
log.debug("%s: binary codec NotImplementedError (sig-B / BW-stray); proceeding from sidecar's idf_report only", path.name)
|
||||
except Exception as exc:
|
||||
log.warning("%s: binary decode failed — %s; proceeding from sidecar's idf_report only", path.name, exc)
|
||||
|
||||
# Run the adapter. Pull report_dict from
|
||||
# extensions.idf_report (the v0.18.0+ ingest preserved it).
|
||||
report_dict = (existing.get("extensions") or {}).get("idf_report") or {}
|
||||
if not report_dict and binary_md is None:
|
||||
log.debug("%s: no idf_report in sidecar AND no binary metadata — nothing to project", path.name)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
bw_report = build_bw_report_from_idf(
|
||||
report_dict, binary_md=binary_md,
|
||||
intervals=idf_intervals, is_histogram=is_histogram,
|
||||
)
|
||||
except Exception as exc:
|
||||
log.warning("%s: adapter failed — %s", path.name, exc)
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
# Build the new sidecar by overlaying refreshed fields onto
|
||||
# the existing one — preserves review, captured_at, blastware
|
||||
# block, source.kind, etc.
|
||||
new_sidecar = dict(existing) # shallow copy
|
||||
new_sidecar["bw_report"] = bw_report
|
||||
src = dict(new_sidecar.get("source") or {})
|
||||
src["tool_version"] = event_file_io.TOOL_VERSION
|
||||
new_sidecar["source"] = src
|
||||
|
||||
# Preserve histogram intervals if the binary decoded them
|
||||
# (improves over the original ingest if that one ran before
|
||||
# the bee1185 codec fix).
|
||||
if idf_intervals is not None:
|
||||
ext = dict(new_sidecar.get("extensions") or {})
|
||||
ext["idf_intervals"] = [
|
||||
{
|
||||
"offset": iv.offset,
|
||||
"tran_peak": iv.peak_count("Tran"),
|
||||
"tran_halfp": iv.tran_halfp,
|
||||
"tran_freq": iv.freq_hz("Tran"),
|
||||
"vert_peak": iv.peak_count("Vert"),
|
||||
"vert_halfp": iv.vert_halfp,
|
||||
"vert_freq": iv.freq_hz("Vert"),
|
||||
"long_peak": iv.peak_count("Long"),
|
||||
"long_halfp": iv.long_halfp,
|
||||
"long_freq": iv.freq_hz("Long"),
|
||||
"mic_peak": iv.peak_count("MicL"),
|
||||
"mic_halfp": iv.micl_halfp,
|
||||
"mic_freq": iv.freq_hz("MicL"),
|
||||
}
|
||||
for iv in idf_intervals
|
||||
]
|
||||
new_sidecar["extensions"] = ext
|
||||
|
||||
if args.dry_run:
|
||||
log.info("[DRY] %s/%s — would refresh sidecar (bw_report=%s, h5=%s)",
|
||||
serial, path.name,
|
||||
"wrote" if not has_bw_report else "refreshed",
|
||||
"would write" if (idf_samples and not args.skip_hdf5) else "skipped")
|
||||
else:
|
||||
event_file_io.write_sidecar(sidecar_path, new_sidecar)
|
||||
log.info("%s/%s — sidecar refreshed (bw_report=%s, intervals=%d)",
|
||||
serial, path.name,
|
||||
"added" if not has_bw_report else "refreshed",
|
||||
len(idf_intervals) if idf_intervals else 0)
|
||||
refreshed += 1
|
||||
|
||||
# Regenerate .h5 for IDFW events with decoded samples by
|
||||
# replaying the same IdfEvent → Event bridge save_imported_idf
|
||||
# uses. IDFH events have no per-sample data; skip.
|
||||
if idf_samples and not args.skip_hdf5 and not is_histogram:
|
||||
from sfm import event_hdf5
|
||||
hdf5_path = store.hdf5_path_for(serial, path.name)
|
||||
if args.dry_run:
|
||||
log.debug("[DRY] would write %s", hdf5_path.name)
|
||||
else:
|
||||
try:
|
||||
from micromate import IdfEvent
|
||||
from minimateplus.event_file_io import file_sha256
|
||||
# Bridge: parsed idf_report dict → IdfEvent →
|
||||
# minimateplus Event → write_event_hdf5. Mirrors
|
||||
# save_imported_idf steps 4-7.
|
||||
idf_event = IdfEvent.from_report(report_dict, path.name)
|
||||
sha256 = file_sha256(path)
|
||||
waveform_key = bytes.fromhex(sha256)[:16]
|
||||
ev = idf_event.to_minimateplus_event(waveform_key)
|
||||
ev.raw_samples = idf_samples
|
||||
n_samp = max(
|
||||
(len(idf_samples.get(ch, []))
|
||||
for ch in ("Tran", "Vert", "Long", "MicL")),
|
||||
default=0,
|
||||
)
|
||||
ev.total_samples = ev.total_samples or n_samp
|
||||
event_hdf5.write_event_hdf5(
|
||||
hdf5_path, ev,
|
||||
serial=serial,
|
||||
geo_range="normal",
|
||||
source_kind="idf-import",
|
||||
tool_version=event_file_io.TOOL_VERSION,
|
||||
)
|
||||
h5_written += 1
|
||||
log.debug("%s/%s — .h5 written (%d samples)",
|
||||
serial, path.name, n_samp)
|
||||
except Exception as exc:
|
||||
log.warning("%s/%s — .h5 write failed: %s",
|
||||
serial, path.name, exc)
|
||||
|
||||
log.info("Done. refreshed=%d skipped=%d errors=%d h5_written=%d",
|
||||
refreshed, skipped, errors, h5_written)
|
||||
return 0 if errors == 0 else 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user