feat(import): v0.16.0 - Fully implemented series 3 BW-ACH pipeline stablized. #19

Merged
serversdown merged 9 commits from ach-report-ingestion into main 2026-05-11 15:55:24 -04:00
4 changed files with 169 additions and 1 deletions
Showing only changes of commit 082e5946bc - Show all commits
+151
View File
@@ -0,0 +1,151 @@
"""
scripts/repair_unknown_serials.py — re-attribute events stuck under
`serial = 'UNKNOWN'` to their correct serial by decoding the BW filename.
Why this is needed
──────────────────
The /db/import/blastware_file endpoint had a bug (fixed in commit a032fa5+1
on the ach-report-ingestion branch) where every forwarded event was inserted
with serial='UNKNOWN' because the endpoint's `_serial_from_event(ev)` stub
returned None and never consulted the BW-filename serial that
`WaveformStore.save_imported_bw()` had already decoded.
Effect on a server that ran a buggy version: every forwarded event's
SeismoDb row has `serial='UNKNOWN'`, even though the on-disk waveform
store has correctly bucketed the files into `BE<NNNN>/` folders. So
the BW binaries / sidecars / HDF5s are fine, but `/db/units` and
`/db/events?serial=...` queries don't surface the events.
This script
───────────
Walks the events table looking for rows with `serial='UNKNOWN'` and
re-attributes each one to the serial decoded from its
`blastware_filename` column. If the row's serial would collide with
an existing row (already-correct duplicate from a later re-forward),
the UNKNOWN row is deleted. Otherwise the row's `serial` column is
updated in-place.
Idempotent: re-running after a successful repair finds zero matching
rows and exits cleanly.
Usage
─────
# Dry-run (default): print what would change, don't touch the DB
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db
# Apply the repair
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db --apply
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from pathlib import Path
# Reach into sfm.waveform_store for the serial decoder. This script
# is run from the repo root via `python -m scripts.repair_unknown_serials`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from sfm.waveform_store import _serial_from_bw_filename
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="Re-attribute events stuck under serial='UNKNOWN'.",
)
p.add_argument(
"--db", required=True, type=Path,
help="Path to seismo_relay.db (e.g. bridges/captures/seismo_relay.db)",
)
p.add_argument(
"--apply", action="store_true",
help="Apply the repair. Without this flag the script runs in "
"dry-run mode and only reports what would change.",
)
args = p.parse_args(argv)
if not args.db.exists():
print(f"DB not found: {args.db}", file=sys.stderr)
return 2
conn = sqlite3.connect(str(args.db))
conn.row_factory = sqlite3.Row
rows = list(conn.execute(
"SELECT id, serial, timestamp, blastware_filename "
" FROM events "
" WHERE serial = 'UNKNOWN' "
" ORDER BY timestamp",
))
print(f"Found {len(rows)} UNKNOWN-serial rows in events table.")
if not rows:
return 0
updated = 0
deleted = 0
unresolved = 0
by_serial: dict[str, int] = {}
for row in rows:
rid = row["id"]
ts = row["timestamp"]
bw_name = row["blastware_filename"]
new_serial = _serial_from_bw_filename(bw_name) if bw_name else None
if not new_serial:
print(f" ⚠ id={rid[:8]} ts={ts} filename={bw_name!r}"
f"cannot decode serial from filename; skipping")
unresolved += 1
continue
# Check for an existing row at the target (serial, timestamp).
existing = conn.execute(
"SELECT id FROM events WHERE serial = ? AND timestamp = ?",
(new_serial, ts),
).fetchone()
action: str
if existing is None:
# Safe to UPDATE in place.
if args.apply:
conn.execute(
"UPDATE events SET serial = ? WHERE id = ?",
(new_serial, rid),
)
action = "UPDATE"
updated += 1
else:
# A correctly-attributed row already exists. Drop the
# UNKNOWN duplicate.
if args.apply:
conn.execute("DELETE FROM events WHERE id = ?", (rid,))
action = "DELETE (dup)"
deleted += 1
by_serial[new_serial] = by_serial.get(new_serial, 0) + 1
print(f" {action:14s} id={rid[:8]} ts={ts} "
f"filename={bw_name}{new_serial}")
if args.apply:
conn.commit()
conn.close()
print()
print(f"Summary:")
print(f" UNKNOWN rows scanned: {len(rows)}")
print(f" Updated to real serial: {updated}")
print(f" Deleted (duplicate of an ")
print(f" already-correct row): {deleted}")
print(f" Unresolved (bad filename): {unresolved}")
print()
if by_serial:
print(f"Per-serial breakdown of repaired rows:")
for serial, count in sorted(by_serial.items()):
print(f" {serial:12s} {count}")
if not args.apply:
print()
print("(dry-run — re-run with --apply to commit)")
return 0
if __name__ == "__main__":
sys.exit(main())
+13 -1
View File
@@ -1673,9 +1673,20 @@ async def db_import_blastware_file(
serial_hint=serial, serial_hint=serial,
bw_report_text=report_bytes, bw_report_text=report_bytes,
) )
# WaveformStore decoded the serial from the BW filename
# (e.g. T104… → BE18104) and surfaces it on `rec`. Use that
# rather than the placeholder `_serial_from_event(ev)` stub,
# which always returned None and was silently bucketing every
# forwarded event into serial="UNKNOWN" in the DB.
resolved_serial = (
serial
or rec.get("serial")
or _serial_from_event(ev)
or "UNKNOWN"
)
inserted, skipped = db.insert_events( inserted, skipped = db.insert_events(
[ev], [ev],
serial=(serial or _serial_from_event(ev) or "UNKNOWN"), serial=resolved_serial,
waveform_records={ waveform_records={
ev._waveform_key.hex(): rec ev._waveform_key.hex(): rec
if ev._waveform_key else None if ev._waveform_key else None
@@ -1687,6 +1698,7 @@ async def db_import_blastware_file(
"stored_filename": rec["filename"], "stored_filename": rec["filename"],
"filesize": rec["filesize"], "filesize": rec["filesize"],
"sha256": rec["sha256"], "sha256": rec["sha256"],
"serial": resolved_serial,
"report_attached": report_bytes is not None, "report_attached": report_bytes is not None,
"inserted": inserted, "inserted": inserted,
"skipped": skipped, "skipped": skipped,
+1
View File
@@ -383,6 +383,7 @@ class WaveformStore:
"a5_pickle_filename": None, "a5_pickle_filename": None,
"hdf5_filename": hdf5_filename, "hdf5_filename": hdf5_filename,
"sidecar_filename": sidecar_path.name, "sidecar_filename": sidecar_path.name,
"serial": serial,
} }
def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]: def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]:
+4
View File
@@ -418,6 +418,10 @@ def test_save_imported_bw_round_trip(tmp_path: Path):
assert rec["filename"] == fname assert rec["filename"] == fname
assert rec["a5_pickle_filename"] is None # no A5 source for BW imports assert rec["a5_pickle_filename"] is None # no A5 source for BW imports
# The serial decoded from the BW filename surfaces on the record so
# the import endpoint can use it when calling SeismoDb.insert_events()
# (otherwise forwarded events would all bucket into serial="UNKNOWN").
assert rec["serial"] == "BE11529"
sc = store.load_sidecar("BE11529", fname) sc = store.load_sidecar("BE11529", fname)
assert sc is not None assert sc is not None
assert sc["source"]["kind"] == "bw-import" assert sc["source"]["kind"] == "bw-import"