Files
seismo-relay/scripts/check_bw_report_preservation.py
serversdown ed6982c512 scripts: bw_report preservation check for backfill safety
Two-step tool to verify that backfill_sidecars doesn't wipe the
bw_report block from existing sidecars.  Workflow:

  1. snapshot --out before.json    (canonical-JSON hash per sidecar)
  2. run backfill
  3. diff --baseline before.json   (classifies every sidecar:
       PRESERVED / CHANGED / WIPED / STILL_MISSING / NEW / ADDED / REMOVED)

Exit code 1 if any WIPED or CHANGED entries found, 0 otherwise — so
it can gate a CI step or a deploy script.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 06:13:52 +00:00

186 lines
6.6 KiB
Python

"""
scripts/check_bw_report_preservation.py — verify that running backfill_sidecars
doesn't wipe the `bw_report` block from sidecars that already had one.
Two-step workflow:
# Before running backfill — capture a baseline snapshot:
python scripts/check_bw_report_preservation.py snapshot \
--store-root /path/to/waveforms \
--out before.json
# Run backfill:
python scripts/backfill_sidecars.py --store-root /path/to/waveforms --force
# After backfill — diff against the baseline:
python scripts/check_bw_report_preservation.py diff \
--store-root /path/to/waveforms \
--baseline before.json
The diff classifies every sidecar into one of:
PRESERVED had bw_report before, has same hash now ← GOOD
CHANGED had bw_report before, has different hash now ← suspicious
(backfill should only ever copy the block verbatim)
WIPED had bw_report before, doesn't now ← BUG — data loss
STILL_MISSING didn't have bw_report before, still doesn't ← expected
NEW didn't have bw_report before, has one now
(only possible if a re-ingest happened between snapshots;
shouldn't happen during backfill)
REMOVED sidecar existed in baseline, file is gone now
ADDED sidecar didn't exist in baseline, exists now
Exit code is 0 if no WIPED or CHANGED entries are found, 1 otherwise.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
# Allow running from the repo root without installation.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from minimateplus import event_file_io
def _bw_report_hash(sidecar_data: dict) -> Optional[str]:
"""Canonical-JSON hash of the bw_report block, or None if absent."""
br = sidecar_data.get("bw_report")
if not br:
return None
# sort_keys for stable hashing across dict-ordering differences
blob = json.dumps(br, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(blob.encode()).hexdigest()
def _scan_store(store_root: Path) -> dict:
"""Walk every <serial>/<file>.sfm.json and return {relpath: hash_or_None}.
Relpath is `<serial>/<filename>` — stable across machines/snapshots.
"""
out: dict[str, Optional[str]] = {}
for serial_dir in sorted(p for p in store_root.iterdir() if p.is_dir()):
for sidecar in sorted(serial_dir.glob("*.sfm.json")):
relpath = f"{serial_dir.name}/{sidecar.name}"
try:
data = event_file_io.read_sidecar(sidecar)
except Exception as exc:
print(f" WARN: failed to read {relpath}: {exc}", file=sys.stderr)
continue
out[relpath] = _bw_report_hash(data)
return out
def cmd_snapshot(args) -> int:
store_root = Path(args.store_root).expanduser().resolve()
if not store_root.exists():
print(f"error: store root does not exist: {store_root}", file=sys.stderr)
return 2
out_path = Path(args.out).expanduser().resolve()
print(f"Scanning {store_root}")
snapshot = _scan_store(store_root)
with_bw = sum(1 for v in snapshot.values() if v is not None)
without_bw = sum(1 for v in snapshot.values() if v is None)
print(f" total sidecars: {len(snapshot)}")
print(f" with bw_report: {with_bw}")
print(f" without bw_report: {without_bw}")
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump({
"store_root": str(store_root),
"total": len(snapshot),
"with_bw": with_bw,
"sidecars": snapshot,
}, f, indent=2, sort_keys=True)
print(f"Wrote baseline → {out_path}")
return 0
def cmd_diff(args) -> int:
store_root = Path(args.store_root).expanduser().resolve()
if not store_root.exists():
print(f"error: store root does not exist: {store_root}", file=sys.stderr)
return 2
baseline_path = Path(args.baseline).expanduser().resolve()
if not baseline_path.exists():
print(f"error: baseline file not found: {baseline_path}", file=sys.stderr)
return 2
with open(baseline_path) as f:
baseline = json.load(f)
before = baseline["sidecars"]
print(f"Scanning {store_root} for comparison against {baseline_path.name}")
after = _scan_store(store_root)
classes = {k: [] for k in (
"PRESERVED", "CHANGED", "WIPED", "STILL_MISSING", "NEW", "REMOVED", "ADDED",
)}
all_keys = set(before) | set(after)
for key in sorted(all_keys):
b = before.get(key, "__MISSING__")
a = after.get(key, "__MISSING__")
if b == "__MISSING__":
classes["ADDED"].append(key)
elif a == "__MISSING__":
classes["REMOVED"].append(key)
elif b is None and a is None:
classes["STILL_MISSING"].append(key)
elif b is None and a is not None:
classes["NEW"].append(key)
elif b is not None and a is None:
classes["WIPED"].append(key)
elif b == a:
classes["PRESERVED"].append(key)
else:
classes["CHANGED"].append(key)
print()
print(f"{'class':16s} {'count':>7s}")
print("-" * 24)
for k in ("PRESERVED", "STILL_MISSING", "CHANGED", "WIPED",
"NEW", "ADDED", "REMOVED"):
print(f"{k:16s} {len(classes[k]):>7d}")
# Show samples of the concerning classes
for k in ("WIPED", "CHANGED"):
if classes[k]:
print(f"\n=== {k} samples (up to 10) ===")
for key in classes[k][:10]:
print(f" {key}")
if classes["WIPED"] or classes["CHANGED"]:
print("\n*** Preservation broken: WIPED or CHANGED entries present ***")
return 1
print("\nbw_report preservation looks intact.")
return 0
def main(argv=None) -> int:
p = argparse.ArgumentParser(description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
p_snap = sub.add_parser("snapshot", help="capture baseline bw_report hashes")
p_snap.add_argument("--store-root", required=True)
p_snap.add_argument("--out", required=True, help="output JSON path")
p_snap.set_defaults(func=cmd_snapshot)
p_diff = sub.add_parser("diff", help="diff current store against a baseline")
p_diff.add_argument("--store-root", required=True)
p_diff.add_argument("--baseline", required=True, help="JSON from `snapshot`")
p_diff.set_defaults(func=cmd_diff)
args = p.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
sys.exit(main())