Files
seismo-relay/scripts/backfill_sidecars.py
T
serversdown e949232875 histogram_codec + backfill: tighter peak ceiling, preserve bw_report
histogram_codec: drop _MAX_PEAK_COUNT 4096 → 2200. The old ceiling
let extension-byte blocks slip through at up to 20.48 in/s per
channel, producing 35× inflated PVS sums when first deployed to
prod. 2200 covers Normal-range full-scale (10 in/s = 2000 counts)
plus 10% headroom for quantization edge cases.

backfill_sidecars: also preserve the bw_report block alongside
review + extensions when regenerating sidecars. event_to_sidecar_dict
takes a BwAsciiReport dataclass not a dict, so for bw_report we
overlay the existing block after regen rather than passing as a kwarg.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:50:10 +00:00

415 lines
19 KiB
Python

"""
scripts/backfill_sidecars.py — generate .sfm.json sidecars AND .h5
clean-waveform files for existing events already in the waveform store
that predate those features.
Walks `<store_root>/<serial>/<filename>` and for each BW event file:
Sidecar (.sfm.json):
- Skip when an existing sidecar's blastware.sha256 matches the
current BW file's sha256.
- Else regenerate: prefer .a5.pkl (full fidelity); fall back to
parsing the BW binary directly (peaks computed from samples).
Clean waveform (.h5):
- Regenerated whenever the sidecar is regenerated (sha mismatch
OR sidecar.source.tool_version < current TOOL_VERSION OR --force).
The .h5 and the sidecar both come from the same decoder output,
so if the sidecar is stale the .h5 is too.
- Written when missing.
- --skip-hdf5 turns off all .h5 writes.
Typical use after a decoder upgrade:
1. Pull the new seismo-relay code (which bumped TOOL_VERSION).
2. Run this script — every sidecar with an older tool_version
stamp regenerates, and the associated .h5 cascade-regenerates.
3. Operator review state (review.false_trigger, notes, reviewer)
and the sidecar's extensions block are preserved across the
regen.
Usage:
python scripts/backfill_sidecars.py [--store-root PATH]
[--db-path PATH]
[--dry-run]
[--skip-hdf5]
[-v]
"""
from __future__ import annotations
import argparse
import logging
import sys
from pathlib import Path
# Allow running from the repo root without installation.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from minimateplus import event_file_io
from sfm import event_hdf5
from sfm.waveform_store import WaveformStore, _frame_to_dict, _dict_to_frame # noqa: F401
from sfm.database import SeismoDb
log = logging.getLogger("backfill_sidecars")
def _looks_like_event_file(path: Path) -> bool:
"""Same heuristic as the importer CLI.
Filters to BW (Series III) event files only — Thor (Series IV)
`.IDFW` / `.IDFH` files share the store but have their own ingest
path (`WaveformStore.save_imported_idf`) and are NOT decodable by
`event_file_io.read_blastware_file`. Their sidecars are populated
at ingest from the paired `.IDFW.txt` ASCII report; nothing the
backfill regenerates would improve on them, so we exclude them
from scope.
"""
if not path.is_file():
return False
if path.name.endswith((".a5.pkl", ".sfm.json", ".h5")):
return False
ext = path.suffix.lstrip(".")
if not (3 <= len(ext) <= 4):
return False
# Thor IDF files share the .{W,H}-suffix shape but aren't BW.
if ext.upper() in ("IDFW", "IDFH"):
return False
if not (ext[-1].upper() in {"W", "H"} or ext.endswith("0")):
return False
try:
return path.stat().st_size >= 70
except OSError:
return False
def main(argv=None) -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--db-path",
default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"),
)
p.add_argument("--store-root", default=None)
p.add_argument("--dry-run", action="store_true")
p.add_argument(
"--skip-hdf5", action="store_true",
help="Don't generate .h5 clean-waveform files (only sidecars).",
)
p.add_argument(
"--force", action="store_true",
help=(
"Regenerate sidecars + .h5 even when an existing sidecar's "
"blastware.sha256 matches the current BW file. Use this after "
"upgrading seismo-relay to pull in decoder bug fixes (e.g. the "
"STRT-rectime byte-offset fix in v0.15.x)."
),
)
p.add_argument("-v", "--verbose", action="store_true")
args = p.parse_args(argv)
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
db_path = Path(args.db_path).expanduser().resolve()
store_root = (
Path(args.store_root).expanduser().resolve()
if args.store_root else db_path.parent / "waveforms"
)
if not store_root.exists():
print(f"error: store root does not exist: {store_root}", file=sys.stderr)
return 2
store = WaveformStore(store_root)
db = SeismoDb(db_path)
written = skipped = errors = 0
for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()):
serial = serial_dir.name
for path in sorted(serial_dir.iterdir()):
if not _looks_like_event_file(path):
continue
sidecar_path = store.sidecar_path_for(serial, path.name)
try:
bw_sha = event_file_io.file_sha256(path)
except Exception as exc:
log.error("sha256 failed for %s: %s", path, exc)
errors += 1
continue
# Skip when an up-to-date sidecar already exists.
#
# Two-part freshness check:
# 1. blastware.sha256 must match the current BW file (proves
# the sidecar describes THIS file).
# 2. source.tool_version must be ≥ current TOOL_VERSION (proves
# the sidecar was written by a build that includes any
# decoder fixes shipped since).
# Either part failing → regenerate. --force bypasses both.
#
# Tracks whether we're regenerating the sidecar this iteration
# so the .h5 logic below knows to refresh that too — staleness
# of the sidecar implies staleness of the derived .h5 (both
# come out of the same decoder).
sidecar_stale = True
if sidecar_path.exists() and not args.force:
try:
existing = event_file_io.read_sidecar(sidecar_path)
sha_ok = existing.get("blastware", {}).get("sha256") == bw_sha
src_ver = existing.get("source", {}).get("tool_version", "")
def _vt(s):
try:
return tuple(int(p) for p in str(s).split(".")[:3])
except Exception:
return (0, 0, 0)
ver_ok = _vt(src_ver) >= _vt(event_file_io.TOOL_VERSION)
if sha_ok and ver_ok:
skipped += 1
sidecar_stale = False
continue
if sha_ok and not ver_ok:
log.info(
"regenerating %s (sidecar tool_version=%s < current %s)",
sidecar_path.name, src_ver or "(none)",
event_file_io.TOOL_VERSION,
)
except Exception:
pass # fall through to rewrite
# Decide path: A5-based (high-fidelity) or BW-only.
a5_path = serial_dir / f"{path.name}.a5.pkl"
try:
if a5_path.exists():
frames = store.load_a5(serial, path.name)
if not frames:
raise RuntimeError("a5_pickle present but unreadable")
# Build an Event by replaying the A5 decoders. Note:
# the .a5.pkl alone CANNOT recover timestamp /
# record_type / waveform_key / per-channel peaks —
# those live in the 0C record, which isn't saved
# separately. We seed those from the DB row + the
# existing sidecar below so a re-backfill doesn't
# nuke fields the original save populated.
from minimateplus.client import (
_decode_a5_metadata_into,
_decode_a5_waveform,
)
from minimateplus.models import Event, PeakValues, ProjectInfo, Timestamp
ev = Event(index=-1)
_decode_a5_metadata_into(frames, ev)
_decode_a5_waveform(frames, ev)
source_kind = "sfm-live"
a5_filename = a5_path.name
else:
ev = event_file_io.read_blastware_file(path)
source_kind = "bw-import"
a5_filename = None
from minimateplus.models import Event, PeakValues, ProjectInfo, Timestamp
# ── Seed missing fields from the SeismoDb events row ──
# The DB row was populated at original save time with peaks,
# project info, timestamp, record_type, sample_rate, etc.
# All of those survive intact in SQLite; pull them onto the
# rebuilt Event so the regenerated sidecar matches what was
# there before the backfill ran.
db_row = None
try:
import sqlite3 as _sql
with _sql.connect(str(db.db_path)) as _conn:
_conn.row_factory = _sql.Row
db_row = _conn.execute(
"SELECT * FROM events "
"WHERE serial=? AND blastware_filename=? "
"LIMIT 1",
(serial, path.name),
).fetchone()
except Exception as exc:
log.debug("DB lookup failed for %s: %s", path.name, exc)
if db_row is not None:
if ev.sample_rate is None and db_row["sample_rate"]:
ev.sample_rate = int(db_row["sample_rate"])
if not ev.record_type and db_row["record_type"]:
ev.record_type = db_row["record_type"]
if ev._waveform_key is None and db_row["waveform_key"]:
try:
ev._waveform_key = bytes.fromhex(db_row["waveform_key"])
except Exception:
pass
# Timestamp from the ISO-8601 string in the DB row.
if ev.timestamp is None and db_row["timestamp"]:
try:
import datetime as _dt
_t = _dt.datetime.fromisoformat(db_row["timestamp"])
ev.timestamp = Timestamp(
raw=b"", flag=0x10,
year=_t.year, unknown_byte=0,
month=_t.month, day=_t.day,
hour=_t.hour, minute=_t.minute, second=_t.second,
)
except Exception:
pass
# Peaks from the DB row when the A5 decode didn't supply them.
if ev.peak_values is None:
ev.peak_values = PeakValues(
tran=db_row["tran_ppv"],
vert=db_row["vert_ppv"],
long=db_row["long_ppv"],
peak_vector_sum=db_row["peak_vector_sum"],
micl=db_row["mic_ppv"],
)
# Project info from the DB row when the A5 metadata-page
# decode didn't pick it up.
if ev.project_info is None or all(
v in (None, "")
for v in (
(ev.project_info.project if ev.project_info else None),
(ev.project_info.client if ev.project_info else None),
(ev.project_info.operator if ev.project_info else None),
(ev.project_info.sensor_location if ev.project_info else None),
)
):
ev.project_info = ProjectInfo(
project=db_row["project"],
client=db_row["client"],
operator=db_row["operator"],
sensor_location=db_row["sensor_location"],
)
# Derive total_samples when we have both rectime + sample_rate.
# The decoder's STRT-derived value can be a buffer offset
# rather than a sample count — drop it in that case.
if ev.sample_rate and ev.rectime_seconds:
derived = int(round(ev.sample_rate * ev.rectime_seconds))
if (ev.total_samples is None
or ev.total_samples > derived * 2
or ev.total_samples < derived // 4):
ev.total_samples = derived
# Preserve user-edited review state + extensions + the
# bw_report block from the existing sidecar so a backfill
# never wipes them out. The bw_report block originates
# from the paired .TXT ASCII report parsed at ORIGINAL
# import time (ach forward / direct upload); the .TXT
# file is not in the waveform store, so we can't re-derive
# it from disk. event_to_sidecar_dict takes a
# BwAsciiReport dataclass (not a dict), so for bw_report
# we overlay the existing block after regen instead of
# passing it as a kwarg.
preserved_review = None
preserved_ext = None
preserved_bw_report = None
if sidecar_path.exists():
try:
_existing = event_file_io.read_sidecar(sidecar_path)
preserved_review = _existing.get("review")
preserved_ext = _existing.get("extensions")
preserved_bw_report = _existing.get("bw_report")
except Exception:
pass
sidecar = event_file_io.event_to_sidecar_dict(
ev,
serial=serial,
blastware_filename=path.name,
blastware_filesize=path.stat().st_size,
blastware_sha256=bw_sha,
source_kind=source_kind,
a5_pickle_filename=a5_filename,
review=preserved_review,
extensions=preserved_ext,
)
if preserved_bw_report is not None:
sidecar["bw_report"] = preserved_bw_report
# Also emit the .h5 clean-waveform file when:
# - it's missing, OR
# - --force was passed, OR
# - the sidecar is being regenerated this iteration
# (sha mismatch / tool_version too old). The .h5 and
# the sidecar are both derived from the same decoder
# output, so if the sidecar is stale, so is the .h5.
#
# Both waveform and histogram bodies now decode to real
# samples via event_file_io.read_blastware_file → either
# waveform_codec.decode_waveform_v2 or histogram_codec.
# decode_histogram_body. If samples are still empty after
# both codecs run, it's a genuine "we can't decode this
# file" case (truncated, malformed, or unknown mode);
# skip the .h5 write so we don't replace whatever's
# there with an empty placeholder.
has_samples = bool(
ev.raw_samples and any(
ev.raw_samples.get(ch) for ch in ("Tran", "Vert", "Long", "MicL")
)
)
hdf5_path = store.hdf5_path_for(serial, path.name)
hdf5_filename = hdf5_path.name if hdf5_path.exists() else None
hdf5_action = "kept"
need_h5 = (
not args.skip_hdf5
and (args.force or not hdf5_path.exists() or sidecar_stale)
and has_samples
)
if not has_samples and not args.skip_hdf5:
hdf5_action = "skipped-undecodable"
if need_h5:
if args.dry_run:
hdf5_action = "would (re)write"
else:
try:
event_hdf5.write_event_hdf5(
hdf5_path, ev,
serial=serial,
geo_range="normal",
source_kind=source_kind,
)
hdf5_filename = hdf5_path.name
hdf5_action = "rewrote" if hdf5_path.exists() else "wrote"
except Exception as exc:
log.warning("HDF5 write failed for %s: %s", path.name, exc)
hdf5_action = "FAILED"
if args.dry_run:
print(f" [DRY ] would write {sidecar_path.name} "
f"+ .h5 ({hdf5_action}) source={source_kind}")
written += 1
continue
event_file_io.write_sidecar(sidecar_path, sidecar)
# Best-effort: keep the SQL row's sidecar_filename in sync
# by upserting via insert_events (it dedups on serial+ts).
try:
db.insert_events(
[ev], serial=serial,
waveform_records=(
{ev._waveform_key.hex(): {
"filename": path.name,
"filesize": path.stat().st_size,
"a5_pickle_filename": a5_filename,
"sidecar_filename": sidecar_path.name,
}}
if ev._waveform_key else None
),
device_family="series3",
)
except Exception as exc:
log.warning("DB upsert failed for %s: %s", path.name, exc)
print(f" [OK ] {path.name}{sidecar_path.name} "
f"+ h5 ({hdf5_action}) source={source_kind}")
written += 1
except Exception as exc:
log.error("backfill failed for %s: %s", path, exc, exc_info=args.verbose)
errors += 1
print(f"\nDone. written={written} skipped(uptodate)={skipped} errors={errors}")
return 0 if errors == 0 else 1
if __name__ == "__main__":
sys.exit(main())