Files
seismo-relay/scripts/repair_unknown_serials.py
serversdown 082e5946bc fix(import): resolve real serial from BW filename instead of bucketing to UNKNOWN
The /db/import/blastware_file endpoint was bucketing every
forwarded event into serial='UNKNOWN' in the DB.  WaveformStore
correctly decoded the serial from the BW filename and saved
files to <store>/<serial>/<filename> (e.g.
.../BE17353/S353L5KC.DR0H.h5), but the endpoint code called
db.insert_events(serial=_serial_from_event(ev)) — and
_serial_from_event was a stub that always returned None,
falling back to "UNKNOWN".

Effect on the user's prod server: 3,039 events forwarded across
24 distinct units, ALL inserted under serial='UNKNOWN'.  The
on-disk waveform store + sidecars + HDF5s were fine, but the
SFM webapp's /db/units only showed the two original manually-
uploaded serials because every forwarded row had its serial
column zeroed to UNKNOWN.

Fix:
  - WaveformStore.save_imported_bw() now surfaces the decoded
    serial on the returned `rec` dict (rec["serial"]).
  - The import endpoint uses rec["serial"] as the authoritative
    fallback when the operator hasn't supplied a serial_hint query
    parameter.  Order of precedence:
      query string `serial` → rec["serial"] → _serial_from_event(ev) → "UNKNOWN"
  - Response payload now includes `serial` per file so the watcher
    log lines (or any future caller) can see which unit each event
    was attributed to.

Recovery for existing DB rows:
  scripts/repair_unknown_serials.py walks the events table looking
  for rows with serial='UNKNOWN' and re-attributes each one to the
  serial decoded from blastware_filename.  Updates the row in place
  unless the target (serial, timestamp) already has a row, in which
  case the UNKNOWN duplicate is deleted.  Idempotent.  Default
  dry-run; pass --apply to commit.

  Verified on the user's actual DB (dry-run):
    UNKNOWN rows scanned:       3039
    Updated to real serial:     2602
    Deleted (duplicate of an
     already-correct row):      437
    Unresolved (bad filename):  0

After running the repair, /db/units will show all 24 units
correctly populated.
2026-05-11 02:25:08 +00:00

152 lines
5.2 KiB
Python

"""
scripts/repair_unknown_serials.py — re-attribute events stuck under
`serial = 'UNKNOWN'` to their correct serial by decoding the BW filename.
Why this is needed
──────────────────
The /db/import/blastware_file endpoint had a bug (fixed in commit a032fa5+1
on the ach-report-ingestion branch) where every forwarded event was inserted
with serial='UNKNOWN' because the endpoint's `_serial_from_event(ev)` stub
returned None and never consulted the BW-filename serial that
`WaveformStore.save_imported_bw()` had already decoded.
Effect on a server that ran a buggy version: every forwarded event's
SeismoDb row has `serial='UNKNOWN'`, even though the on-disk waveform
store has correctly bucketed the files into `BE<NNNN>/` folders. So
the BW binaries / sidecars / HDF5s are fine, but `/db/units` and
`/db/events?serial=...` queries don't surface the events.
This script
───────────
Walks the events table looking for rows with `serial='UNKNOWN'` and
re-attributes each one to the serial decoded from its
`blastware_filename` column. If the row's serial would collide with
an existing row (already-correct duplicate from a later re-forward),
the UNKNOWN row is deleted. Otherwise the row's `serial` column is
updated in-place.
Idempotent: re-running after a successful repair finds zero matching
rows and exits cleanly.
Usage
─────
# Dry-run (default): print what would change, don't touch the DB
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db
# Apply the repair
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db --apply
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from pathlib import Path
# Reach into sfm.waveform_store for the serial decoder. This script
# is run from the repo root via `python -m scripts.repair_unknown_serials`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from sfm.waveform_store import _serial_from_bw_filename
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="Re-attribute events stuck under serial='UNKNOWN'.",
)
p.add_argument(
"--db", required=True, type=Path,
help="Path to seismo_relay.db (e.g. bridges/captures/seismo_relay.db)",
)
p.add_argument(
"--apply", action="store_true",
help="Apply the repair. Without this flag the script runs in "
"dry-run mode and only reports what would change.",
)
args = p.parse_args(argv)
if not args.db.exists():
print(f"DB not found: {args.db}", file=sys.stderr)
return 2
conn = sqlite3.connect(str(args.db))
conn.row_factory = sqlite3.Row
rows = list(conn.execute(
"SELECT id, serial, timestamp, blastware_filename "
" FROM events "
" WHERE serial = 'UNKNOWN' "
" ORDER BY timestamp",
))
print(f"Found {len(rows)} UNKNOWN-serial rows in events table.")
if not rows:
return 0
updated = 0
deleted = 0
unresolved = 0
by_serial: dict[str, int] = {}
for row in rows:
rid = row["id"]
ts = row["timestamp"]
bw_name = row["blastware_filename"]
new_serial = _serial_from_bw_filename(bw_name) if bw_name else None
if not new_serial:
print(f" ⚠ id={rid[:8]} ts={ts} filename={bw_name!r}"
f"cannot decode serial from filename; skipping")
unresolved += 1
continue
# Check for an existing row at the target (serial, timestamp).
existing = conn.execute(
"SELECT id FROM events WHERE serial = ? AND timestamp = ?",
(new_serial, ts),
).fetchone()
action: str
if existing is None:
# Safe to UPDATE in place.
if args.apply:
conn.execute(
"UPDATE events SET serial = ? WHERE id = ?",
(new_serial, rid),
)
action = "UPDATE"
updated += 1
else:
# A correctly-attributed row already exists. Drop the
# UNKNOWN duplicate.
if args.apply:
conn.execute("DELETE FROM events WHERE id = ?", (rid,))
action = "DELETE (dup)"
deleted += 1
by_serial[new_serial] = by_serial.get(new_serial, 0) + 1
print(f" {action:14s} id={rid[:8]} ts={ts} "
f"filename={bw_name}{new_serial}")
if args.apply:
conn.commit()
conn.close()
print()
print(f"Summary:")
print(f" UNKNOWN rows scanned: {len(rows)}")
print(f" Updated to real serial: {updated}")
print(f" Deleted (duplicate of an ")
print(f" already-correct row): {deleted}")
print(f" Unresolved (bad filename): {unresolved}")
print()
if by_serial:
print(f"Per-serial breakdown of repaired rows:")
for serial, count in sorted(by_serial.items()):
print(f" {serial:12s} {count}")
if not args.apply:
print()
print("(dry-run — re-run with --apply to commit)")
return 0
if __name__ == "__main__":
sys.exit(main())