196 lines
6.9 KiB
Python
196 lines
6.9 KiB
Python
"""
|
|
sfm/import_bw.py — CLI for ingesting Blastware-format event files.
|
|
|
|
Walks a path (file or directory), parses each recognised event-file
|
|
binary, copies it into the canonical waveform store, writes the
|
|
.sfm.json sidecar, and upserts a row in seismo_relay.db.
|
|
|
|
Use cases:
|
|
- Migrating a Blastware ACH inbox into SFM
|
|
- One-off imports of files emailed in by field crews
|
|
- Bulk-loading historical archives
|
|
|
|
Usage:
|
|
python -m sfm.import_bw <path-or-dir> [--serial BE11529]
|
|
[--db-path bridges/captures/seismo_relay.db]
|
|
[--store-root bridges/captures/waveforms]
|
|
[--dry-run]
|
|
[-v]
|
|
|
|
Examples:
|
|
python -m sfm.import_bw ~/Downloads/M529LKIQ.7M0W
|
|
python -m sfm.import_bw /path/to/blastware_archive --serial BE11529
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
# Allow running from the repo root without installation.
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from sfm.database import SeismoDb
|
|
from sfm.waveform_store import WaveformStore
|
|
|
|
log = logging.getLogger("sfm.import_bw")
|
|
|
|
|
|
# Blastware event-file extensions: 4-char `AB0T` (T = W or H) for ACH
|
|
# downloads, 3-char `AB0` for direct downloads. We discover candidates
|
|
# by length + last-char rather than enumerating every (A, B) pair.
|
|
def _looks_like_bw_event(path: Path) -> bool:
|
|
"""Heuristic: 3-char or 4-char extension, ends with W/H/0, and the
|
|
file is at least 70 bytes (header + STRT + footer minimum)."""
|
|
if not path.is_file():
|
|
return False
|
|
ext = path.suffix.lstrip(".")
|
|
if not (3 <= len(ext) <= 4):
|
|
return False
|
|
if not (ext[-1].upper() in {"W", "H"} or ext.endswith("0")):
|
|
return False
|
|
try:
|
|
return path.stat().st_size >= 70
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def _walk(path: Path) -> Iterator[Path]:
|
|
"""Yield candidate BW event-file paths under `path` (file or dir)."""
|
|
if path.is_file():
|
|
if _looks_like_bw_event(path):
|
|
yield path
|
|
return
|
|
if path.is_dir():
|
|
for p in sorted(path.rglob("*")):
|
|
if _looks_like_bw_event(p):
|
|
yield p
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="Import Blastware-format event files into the SFM store + DB.",
|
|
)
|
|
p.add_argument("path", help="File or directory to import.")
|
|
p.add_argument(
|
|
"--serial", default=None, metavar="SERIAL",
|
|
help="Override the serial-number hint (e.g. BE11529). Defaults to "
|
|
"the value decoded from each BW filename's prefix.",
|
|
)
|
|
p.add_argument(
|
|
"--db-path",
|
|
default=str(Path(__file__).resolve().parent.parent / "bridges" / "captures" / "seismo_relay.db"),
|
|
help="Path to seismo_relay.db (default: bridges/captures/seismo_relay.db).",
|
|
)
|
|
p.add_argument(
|
|
"--store-root",
|
|
default=None,
|
|
help="Root of the waveform store (default: <db_dir>/waveforms).",
|
|
)
|
|
p.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Parse and report per-file outcomes; don't write anything.",
|
|
)
|
|
p.add_argument("-v", "--verbose", action="store_true", help="Debug logging.")
|
|
args = p.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
src = Path(args.path).expanduser().resolve()
|
|
if not src.exists():
|
|
print(f"error: {src} does not exist", file=sys.stderr)
|
|
return 2
|
|
|
|
db_path = Path(args.db_path).expanduser().resolve()
|
|
store_root = (
|
|
Path(args.store_root).expanduser().resolve()
|
|
if args.store_root else db_path.parent / "waveforms"
|
|
)
|
|
|
|
db = None if args.dry_run else SeismoDb(db_path)
|
|
store = None if args.dry_run else WaveformStore(store_root)
|
|
|
|
candidates = list(_walk(src))
|
|
if not candidates:
|
|
print(f"No BW event-file candidates found under {src}", file=sys.stderr)
|
|
return 1
|
|
|
|
print(f"Importing {len(candidates)} file(s) from {src}...")
|
|
if args.dry_run:
|
|
print("(dry-run — no writes will occur)")
|
|
|
|
ok = err = skipped = 0
|
|
for path in candidates:
|
|
try:
|
|
bw_bytes = path.read_bytes()
|
|
except Exception as exc:
|
|
print(f" [ERR ] {path}: read failed: {exc}")
|
|
err += 1
|
|
continue
|
|
|
|
if args.dry_run:
|
|
# Just parse to verify integrity; don't touch DB or store.
|
|
from minimateplus import event_file_io
|
|
try:
|
|
ev = event_file_io.read_blastware_file(path)
|
|
ts = ev.timestamp and (
|
|
f"{ev.timestamp.year}-{ev.timestamp.month:02d}-{ev.timestamp.day:02d} "
|
|
f"{ev.timestamp.hour:02d}:{ev.timestamp.minute:02d}:{ev.timestamp.second:02d}"
|
|
) or "?"
|
|
pv = ev.peak_values
|
|
pvs = pv.peak_vector_sum if pv and pv.peak_vector_sum is not None else 0.0
|
|
print(f" [OK ] {path.name} ts={ts} PVS={pvs:.4f}")
|
|
ok += 1
|
|
except Exception as exc:
|
|
print(f" [ERR ] {path}: parse failed: {exc}")
|
|
err += 1
|
|
continue
|
|
|
|
try:
|
|
ev, rec = store.save_imported_bw(
|
|
bw_bytes, source_path=path, serial_hint=args.serial,
|
|
)
|
|
# Resolve serial for the DB row. Prefer the hint, then the
|
|
# one decoded from the filename (already done by the store).
|
|
serial_used = args.serial or _infer_serial(path.name) or "UNKNOWN"
|
|
ins, sk = db.insert_events(
|
|
[ev], serial=serial_used,
|
|
waveform_records=(
|
|
{ev._waveform_key.hex(): rec}
|
|
if ev._waveform_key else None
|
|
),
|
|
device_family="series3",
|
|
)
|
|
tag = "OK " if ins else ("SKIP" if sk else "OK ")
|
|
print(f" [{tag}] {path.name} → {rec['filename']} "
|
|
f"({rec['filesize']} B, sha256={rec['sha256'][:12]}…) "
|
|
f"serial={serial_used} ins={ins} skip={sk}")
|
|
if ins:
|
|
ok += 1
|
|
else:
|
|
skipped += 1
|
|
except Exception as exc:
|
|
print(f" [ERR ] {path}: import failed: {exc}")
|
|
log.debug("traceback", exc_info=True)
|
|
err += 1
|
|
|
|
print(f"\nDone. ok={ok} skipped={skipped} errors={err}")
|
|
return 0 if err == 0 else 1
|
|
|
|
|
|
def _infer_serial(filename: str):
|
|
"""Reuse WaveformStore's filename → serial decoder for log output."""
|
|
from sfm.waveform_store import _serial_from_bw_filename
|
|
return _serial_from_bw_filename(filename)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|