""" sfm/import_bw.py — CLI for ingesting Blastware-format event files. Walks a path (file or directory), parses each recognised event-file binary, copies it into the canonical waveform store, writes the .sfm.json sidecar, and upserts a row in seismo_relay.db. Use cases: - Migrating a Blastware ACH inbox into SFM - One-off imports of files emailed in by field crews - Bulk-loading historical archives Usage: python -m sfm.import_bw [--serial BE11529] [--db-path bridges/captures/seismo_relay.db] [--store-root bridges/captures/waveforms] [--dry-run] [-v] Examples: python -m sfm.import_bw ~/Downloads/M529LKIQ.7M0W python -m sfm.import_bw /path/to/blastware_archive --serial BE11529 """ from __future__ import annotations import argparse import logging import sys from pathlib import Path from typing import Iterator # Allow running from the repo root without installation. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from sfm.database import SeismoDb from sfm.waveform_store import WaveformStore log = logging.getLogger("sfm.import_bw") # Blastware event-file extensions: 4-char `AB0T` (T = W or H) for ACH # downloads, 3-char `AB0` for direct downloads. We discover candidates # by length + last-char rather than enumerating every (A, B) pair. def _looks_like_bw_event(path: Path) -> bool: """Heuristic: 3-char or 4-char extension, ends with W/H/0, and the file is at least 70 bytes (header + STRT + footer minimum).""" if not path.is_file(): return False ext = path.suffix.lstrip(".") if not (3 <= len(ext) <= 4): return False if not (ext[-1].upper() in {"W", "H"} or ext.endswith("0")): return False try: return path.stat().st_size >= 70 except OSError: return False def _walk(path: Path) -> Iterator[Path]: """Yield candidate BW event-file paths under `path` (file or dir).""" if path.is_file(): if _looks_like_bw_event(path): yield path return if path.is_dir(): for p in sorted(path.rglob("*")): if _looks_like_bw_event(p): yield p def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser( description="Import Blastware-format event files into the SFM store + DB.", ) p.add_argument("path", help="File or directory to import.") p.add_argument( "--serial", default=None, metavar="SERIAL", help="Override the serial-number hint (e.g. BE11529). Defaults to " "the value decoded from each BW filename's prefix.", ) p.add_argument( "--db-path", default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"), help="Path to seismo_relay.db (default: bridges/captures/seismo_relay.db).", ) p.add_argument( "--store-root", default=None, help="Root of the waveform store (default: /waveforms).", ) p.add_argument( "--dry-run", action="store_true", help="Parse and report per-file outcomes; don't write anything.", ) p.add_argument("-v", "--verbose", action="store_true", help="Debug logging.") args = p.parse_args(argv) logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", datefmt="%H:%M:%S", ) src = Path(args.path).expanduser().resolve() if not src.exists(): print(f"error: {src} does not exist", file=sys.stderr) return 2 db_path = Path(args.db_path).expanduser().resolve() store_root = ( Path(args.store_root).expanduser().resolve() if args.store_root else db_path.parent / "waveforms" ) db = None if args.dry_run else SeismoDb(db_path) store = None if args.dry_run else WaveformStore(store_root) candidates = list(_walk(src)) if not candidates: print(f"No BW event-file candidates found under {src}", file=sys.stderr) return 1 print(f"Importing {len(candidates)} file(s) from {src}...") if args.dry_run: print("(dry-run — no writes will occur)") ok = err = skipped = 0 for path in candidates: try: bw_bytes = path.read_bytes() except Exception as exc: print(f" [ERR ] {path}: read failed: {exc}") err += 1 continue if args.dry_run: # Just parse to verify integrity; don't touch DB or store. from minimateplus import event_file_io try: ev = event_file_io.read_blastware_file(path) ts = ev.timestamp and ( f"{ev.timestamp.year}-{ev.timestamp.month:02d}-{ev.timestamp.day:02d} " f"{ev.timestamp.hour:02d}:{ev.timestamp.minute:02d}:{ev.timestamp.second:02d}" ) or "?" pv = ev.peak_values pvs = pv.peak_vector_sum if pv and pv.peak_vector_sum is not None else 0.0 print(f" [OK ] {path.name} ts={ts} PVS={pvs:.4f}") ok += 1 except Exception as exc: print(f" [ERR ] {path}: parse failed: {exc}") err += 1 continue try: ev, rec = store.save_imported_bw( bw_bytes, source_path=path, serial_hint=args.serial, ) # Resolve serial for the DB row. Prefer the hint, then the # one decoded from the filename (already done by the store). serial_used = args.serial or _infer_serial(path.name) or "UNKNOWN" ins, sk = db.insert_events( [ev], serial=serial_used, waveform_records=( {ev._waveform_key.hex(): rec} if ev._waveform_key else None ), device_family="series3", ) tag = "OK " if ins else ("SKIP" if sk else "OK ") print(f" [{tag}] {path.name} → {rec['filename']} " f"({rec['filesize']} B, sha256={rec['sha256'][:12]}…) " f"serial={serial_used} ins={ins} skip={sk}") if ins: ok += 1 else: skipped += 1 except Exception as exc: print(f" [ERR ] {path}: import failed: {exc}") log.debug("traceback", exc_info=True) err += 1 print(f"\nDone. ok={ok} skipped={skipped} errors={err}") return 0 if err == 0 else 1 def _infer_serial(filename: str): """Reuse WaveformStore's filename → serial decoder for log output.""" from sfm.waveform_store import _serial_from_bw_filename return _serial_from_bw_filename(filename) if __name__ == "__main__": sys.exit(main())