From 082e5946bc21818a9dd016852657ce16f79de66f Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 11 May 2026 02:25:08 +0000 Subject: [PATCH] fix(import): resolve real serial from BW filename instead of bucketing to UNKNOWN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /db/import/blastware_file endpoint was bucketing every forwarded event into serial='UNKNOWN' in the DB. WaveformStore correctly decoded the serial from the BW filename and saved files to // (e.g. .../BE17353/S353L5KC.DR0H.h5), but the endpoint code called db.insert_events(serial=_serial_from_event(ev)) — and _serial_from_event was a stub that always returned None, falling back to "UNKNOWN". Effect on the user's prod server: 3,039 events forwarded across 24 distinct units, ALL inserted under serial='UNKNOWN'. The on-disk waveform store + sidecars + HDF5s were fine, but the SFM webapp's /db/units only showed the two original manually- uploaded serials because every forwarded row had its serial column zeroed to UNKNOWN. Fix: - WaveformStore.save_imported_bw() now surfaces the decoded serial on the returned `rec` dict (rec["serial"]). - The import endpoint uses rec["serial"] as the authoritative fallback when the operator hasn't supplied a serial_hint query parameter. Order of precedence: query string `serial` → rec["serial"] → _serial_from_event(ev) → "UNKNOWN" - Response payload now includes `serial` per file so the watcher log lines (or any future caller) can see which unit each event was attributed to. Recovery for existing DB rows: scripts/repair_unknown_serials.py walks the events table looking for rows with serial='UNKNOWN' and re-attributes each one to the serial decoded from blastware_filename. Updates the row in place unless the target (serial, timestamp) already has a row, in which case the UNKNOWN duplicate is deleted. Idempotent. Default dry-run; pass --apply to commit. Verified on the user's actual DB (dry-run): UNKNOWN rows scanned: 3039 Updated to real serial: 2602 Deleted (duplicate of an already-correct row): 437 Unresolved (bad filename): 0 After running the repair, /db/units will show all 24 units correctly populated. --- scripts/repair_unknown_serials.py | 151 ++++++++++++++++++++++++++++++ sfm/server.py | 14 ++- sfm/waveform_store.py | 1 + tests/test_event_file_io.py | 4 + 4 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 scripts/repair_unknown_serials.py diff --git a/scripts/repair_unknown_serials.py b/scripts/repair_unknown_serials.py new file mode 100644 index 0000000..132bab9 --- /dev/null +++ b/scripts/repair_unknown_serials.py @@ -0,0 +1,151 @@ +""" +scripts/repair_unknown_serials.py — re-attribute events stuck under +`serial = 'UNKNOWN'` to their correct serial by decoding the BW filename. + +Why this is needed +────────────────── +The /db/import/blastware_file endpoint had a bug (fixed in commit a032fa5+1 +on the ach-report-ingestion branch) where every forwarded event was inserted +with serial='UNKNOWN' because the endpoint's `_serial_from_event(ev)` stub +returned None and never consulted the BW-filename serial that +`WaveformStore.save_imported_bw()` had already decoded. + +Effect on a server that ran a buggy version: every forwarded event's +SeismoDb row has `serial='UNKNOWN'`, even though the on-disk waveform +store has correctly bucketed the files into `BE/` folders. So +the BW binaries / sidecars / HDF5s are fine, but `/db/units` and +`/db/events?serial=...` queries don't surface the events. + +This script +─────────── +Walks the events table looking for rows with `serial='UNKNOWN'` and +re-attributes each one to the serial decoded from its +`blastware_filename` column. If the row's serial would collide with +an existing row (already-correct duplicate from a later re-forward), +the UNKNOWN row is deleted. Otherwise the row's `serial` column is +updated in-place. + +Idempotent: re-running after a successful repair finds zero matching +rows and exits cleanly. + +Usage +───── + # Dry-run (default): print what would change, don't touch the DB + python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db + + # Apply the repair + python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db --apply +""" + +from __future__ import annotations + +import argparse +import sqlite3 +import sys +from pathlib import Path + +# Reach into sfm.waveform_store for the serial decoder. This script +# is run from the repo root via `python -m scripts.repair_unknown_serials`. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from sfm.waveform_store import _serial_from_bw_filename + + +def main(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser( + description="Re-attribute events stuck under serial='UNKNOWN'.", + ) + p.add_argument( + "--db", required=True, type=Path, + help="Path to seismo_relay.db (e.g. bridges/captures/seismo_relay.db)", + ) + p.add_argument( + "--apply", action="store_true", + help="Apply the repair. Without this flag the script runs in " + "dry-run mode and only reports what would change.", + ) + args = p.parse_args(argv) + + if not args.db.exists(): + print(f"DB not found: {args.db}", file=sys.stderr) + return 2 + + conn = sqlite3.connect(str(args.db)) + conn.row_factory = sqlite3.Row + + rows = list(conn.execute( + "SELECT id, serial, timestamp, blastware_filename " + " FROM events " + " WHERE serial = 'UNKNOWN' " + " ORDER BY timestamp", + )) + print(f"Found {len(rows)} UNKNOWN-serial rows in events table.") + if not rows: + return 0 + + updated = 0 + deleted = 0 + unresolved = 0 + by_serial: dict[str, int] = {} + + for row in rows: + rid = row["id"] + ts = row["timestamp"] + bw_name = row["blastware_filename"] + new_serial = _serial_from_bw_filename(bw_name) if bw_name else None + if not new_serial: + print(f" ⚠ id={rid[:8]} ts={ts} filename={bw_name!r} — " + f"cannot decode serial from filename; skipping") + unresolved += 1 + continue + + # Check for an existing row at the target (serial, timestamp). + existing = conn.execute( + "SELECT id FROM events WHERE serial = ? AND timestamp = ?", + (new_serial, ts), + ).fetchone() + action: str + if existing is None: + # Safe to UPDATE in place. + if args.apply: + conn.execute( + "UPDATE events SET serial = ? WHERE id = ?", + (new_serial, rid), + ) + action = "UPDATE" + updated += 1 + else: + # A correctly-attributed row already exists. Drop the + # UNKNOWN duplicate. + if args.apply: + conn.execute("DELETE FROM events WHERE id = ?", (rid,)) + action = "DELETE (dup)" + deleted += 1 + + by_serial[new_serial] = by_serial.get(new_serial, 0) + 1 + print(f" {action:14s} id={rid[:8]} ts={ts} " + f"filename={bw_name} → {new_serial}") + + if args.apply: + conn.commit() + conn.close() + + print() + print(f"Summary:") + print(f" UNKNOWN rows scanned: {len(rows)}") + print(f" Updated to real serial: {updated}") + print(f" Deleted (duplicate of an ") + print(f" already-correct row): {deleted}") + print(f" Unresolved (bad filename): {unresolved}") + print() + if by_serial: + print(f"Per-serial breakdown of repaired rows:") + for serial, count in sorted(by_serial.items()): + print(f" {serial:12s} {count}") + if not args.apply: + print() + print("(dry-run — re-run with --apply to commit)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/sfm/server.py b/sfm/server.py index 08c4b4b..cd2cf80 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -1673,9 +1673,20 @@ async def db_import_blastware_file( serial_hint=serial, bw_report_text=report_bytes, ) + # WaveformStore decoded the serial from the BW filename + # (e.g. T104… → BE18104) and surfaces it on `rec`. Use that + # rather than the placeholder `_serial_from_event(ev)` stub, + # which always returned None and was silently bucketing every + # forwarded event into serial="UNKNOWN" in the DB. + resolved_serial = ( + serial + or rec.get("serial") + or _serial_from_event(ev) + or "UNKNOWN" + ) inserted, skipped = db.insert_events( [ev], - serial=(serial or _serial_from_event(ev) or "UNKNOWN"), + serial=resolved_serial, waveform_records={ ev._waveform_key.hex(): rec if ev._waveform_key else None @@ -1687,6 +1698,7 @@ async def db_import_blastware_file( "stored_filename": rec["filename"], "filesize": rec["filesize"], "sha256": rec["sha256"], + "serial": resolved_serial, "report_attached": report_bytes is not None, "inserted": inserted, "skipped": skipped, diff --git a/sfm/waveform_store.py b/sfm/waveform_store.py index 92b1572..10b3d23 100644 --- a/sfm/waveform_store.py +++ b/sfm/waveform_store.py @@ -383,6 +383,7 @@ class WaveformStore: "a5_pickle_filename": None, "hdf5_filename": hdf5_filename, "sidecar_filename": sidecar_path.name, + "serial": serial, } def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]: diff --git a/tests/test_event_file_io.py b/tests/test_event_file_io.py index fbd2eca..a1990f0 100644 --- a/tests/test_event_file_io.py +++ b/tests/test_event_file_io.py @@ -418,6 +418,10 @@ def test_save_imported_bw_round_trip(tmp_path: Path): assert rec["filename"] == fname assert rec["a5_pickle_filename"] is None # no A5 source for BW imports + # The serial decoded from the BW filename surfaces on the record so + # the import endpoint can use it when calling SeismoDb.insert_events() + # (otherwise forwarded events would all bucket into serial="UNKNOWN"). + assert rec["serial"] == "BE11529" sc = store.load_sidecar("BE11529", fname) assert sc is not None assert sc["source"]["kind"] == "bw-import"