Files
seismo-relay/sfm/waveform_store.py
T
serversdown ad2b553c7b ingest: preserve raw BW ASCII report (.TXT) alongside the binary
Previously the .TXT was parsed into the sidecar's bw_report projection
and then discarded at ingest time.  Now save_imported_bw() writes it
to <store>/<serial>/<filename>_ASCII.TXT permanently.

Rationale: with BW Mail / Forwarding Agent being phased out of the
operator workflow, the XML/PDF/WMF those tools produce won't be
available — the binary + .TXT (created by BW ACH itself) are our
only authoritative inputs going forward.  Keeping the raw .TXT
unlocks:

  - Parser bug fixes can be applied RETROACTIVELY by re-parsing the
    stored .TXT, instead of requiring a re-forward from the watcher
    PC (which lost the .TXT after BW ACH cleanup).
  - Audit trail of what BW actually sent us, for debugging.
  - The five known parser-PPV-miss events will be re-parseable once
    the regex fix lands (instead of staying broken indefinitely).

Storage cost: ~15 KB per event × 14k events = ~210 MB on the
existing prod corpus.  Negligible.

Implementation:
  - WaveformStore gains txt_path_for() + open_txt()
  - save_imported_bw() writes the .TXT when bw_report_text is supplied
  - sidecar source block records the txt_filename
  - backfill_sidecars.py preserves txt_filename across regens
  - New GET /db/events/{id}/ascii_report.txt endpoint serves it
  - Returns 404 for events ingested before this change (no .TXT in
    the store yet) — re-forward to populate

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 20:01:12 +00:00

656 lines
27 KiB
Python

"""
sfm/waveform_store.py — On-disk store for Blastware-format event files.
Layout (flat per-serial, four files per event):
<root>/<serial>/<filename> ← event file (BW-readable binary)
<root>/<serial>/<filename>.a5.pkl ← pickled list of A5 S3Frame dicts
<root>/<serial>/<filename>.h5 ← clean waveform arrays (HDF5)
<root>/<serial>/<filename>.sfm.json ← modern sidecar (peaks, project,
review state, extensions)
`<filename>` is whatever `minimateplus.blastware_file.blastware_filename`
produces for the event. The extension is NOT a fixed type tag — it
encodes the event timestamp (`AB0T` format).
Roles:
- BW binary: what Blastware reads. Untouched. The user-facing review
waveform viewer.
- .a5.pkl: regenerative source. Lets the BW binary be rebuilt
byte-for-byte if the encoder changes. Never delete.
- .h5: clean per-channel waveform arrays in physical units (in/s for
geo, psi for mic) plus event metadata. Canonical format for
downstream analysis tools and the `/device/event/{idx}/waveform`
endpoint's plot-JSON output.
- .sfm.json: small, queryable metadata + review state. SQL
`events.false_trigger` is a derived index kept in sync via
`patch_sidecar()`.
"""
from __future__ import annotations
import datetime
import logging
import pickle
import shutil
from pathlib import Path
from typing import Optional, Union
from minimateplus import event_file_io
from minimateplus.blastware_file import blastware_filename, write_blastware_file
from minimateplus.framing import S3Frame
from minimateplus.models import Event
from sfm import event_hdf5
log = logging.getLogger("sfm.waveform_store")
A5_PICKLE_VERSION = 1
def _frame_to_dict(f: S3Frame) -> dict:
return {
"sub": f.sub,
"page_hi": f.page_hi,
"page_lo": f.page_lo,
"data": bytes(f.data),
"chk_byte": f.chk_byte,
"checksum_valid": f.checksum_valid,
}
def _dict_to_frame(d: dict) -> S3Frame:
return S3Frame(
sub=d["sub"],
page_hi=d["page_hi"],
page_lo=d["page_lo"],
data=bytes(d["data"]),
checksum_valid=d.get("checksum_valid", True),
chk_byte=d.get("chk_byte", 0),
)
class WaveformStore:
"""
Persistent store for Blastware-format waveform files + their A5 source frames.
Thread safety: write_blastware_file is single-shot; concurrent saves of the
*same* filename would race, but the filename encodes second-resolution
timestamps + serial, so collisions across threads/processes are vanishingly
unlikely in practice.
"""
def __init__(self, root: str | Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
log.info("WaveformStore root=%s", self.root)
# ── path helpers ────────────────────────────────────────────────────────────
def _serial_dir(self, serial: str) -> Path:
d = self.root / serial
d.mkdir(parents=True, exist_ok=True)
return d
def paths_for(self, serial: str, filename: str) -> tuple[Path, Path]:
"""Return (blastware_path, a5_pickle_path) for a given serial+filename.
For the sidecar path use `sidecar_path_for()` — kept separate so
existing callers don't need to unpack a 3-tuple.
"""
d = self._serial_dir(serial)
return d / filename, d / f"{filename}.a5.pkl"
def sidecar_path_for(self, serial: str, filename: str) -> Path:
"""Return absolute path to the .sfm.json sidecar for a given event."""
return self._serial_dir(serial) / f"{filename}.sfm.json"
def hdf5_path_for(self, serial: str, filename: str) -> Path:
"""Return absolute path to the .h5 clean-waveform file for a given event."""
return self._serial_dir(serial) / f"{filename}.h5"
def txt_path_for(self, serial: str, filename: str) -> Path:
"""Return absolute path to the preserved BW ASCII report (.TXT)
for a given event.
We name it ``<filename>_ASCII.TXT`` to match BW's own filename
convention in the ACH folder. Saved at ingest time alongside
the binary so the parser bug fixes can be applied retroactively
by re-parsing without needing to re-forward from the watcher PC.
"""
return self._serial_dir(serial) / f"{filename}_ASCII.TXT"
def open_blastware(self, serial: str, filename: str) -> Optional[Path]:
"""Return absolute path to an existing event file or None."""
bw_path, _ = self.paths_for(serial, filename)
return bw_path if bw_path.exists() else None
def open_txt(self, serial: str, filename: str) -> Optional[Path]:
"""Return absolute path to the preserved BW ASCII report for an
event, or None if the .TXT wasn't saved at ingest time (events
ingested before .TXT preservation landed will show None until
re-forwarded)."""
p = self.txt_path_for(serial, filename)
return p if p.exists() else None
# ── save / load ─────────────────────────────────────────────────────────────
def save(
self,
ev: Event,
serial: str,
a5_frames: list[S3Frame],
*,
source_kind: str = "sfm-live",
geo_range = "normal",
) -> dict:
"""
Write all four event-file artifacts for one event:
- <filename> BW binary
- <filename>.a5.pkl raw A5 frame pickle
- <filename>.h5 clean waveform (HDF5)
- <filename>.sfm.json modern sidecar (metadata + review)
Returns a record dict suitable for persisting alongside the DB row:
{
"filename": "M529LKIQ.7M0W",
"filesize": 8708,
"sha256": "a1b2c3...",
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
"hdf5_filename": "M529LKIQ.7M0W.h5",
"sidecar_filename": "M529LKIQ.7M0W.sfm.json",
}
`source_kind` flows into `sidecar.source.kind` — callers should
pass "sfm-live" (default) for the live endpoint and "sfm-ach" for
the ACH ingestion path. BW-imported events use save_imported_bw()
instead.
`geo_range` controls the ADC-counts → in/s scaling in the HDF5
file ("normal" = 10 in/s FS, "sensitive" = 1.25 in/s FS).
Defaults to "normal" — callers with compliance-config access
should pass the actual unit setting so the saved samples are in
the right units.
Idempotent: if the event file already exists, it is overwritten
with the freshly-encoded version (same bytes for the same
a5_frames) and the sidecar's review block is preserved across
re-saves.
"""
if not a5_frames:
raise ValueError("WaveformStore.save: a5_frames is empty")
if not serial:
raise ValueError("WaveformStore.save: serial is required")
filename = blastware_filename(ev, serial)
bw_path, a5_path = self.paths_for(serial, filename)
sidecar_path = self.sidecar_path_for(serial, filename)
hdf5_path = self.hdf5_path_for(serial, filename)
# 1. encode the event file (defensive unlink prevents trailing-byte
# leaks from a previous larger file on synced/odd filesystems).
try:
bw_path.unlink()
except FileNotFoundError:
pass
write_blastware_file(ev, a5_frames, bw_path)
filesize = bw_path.stat().st_size
sha256 = event_file_io.file_sha256(bw_path)
# 2. write the .a5.pkl sidecar
try:
a5_path.unlink()
except FileNotFoundError:
pass
payload = {
"version": A5_PICKLE_VERSION,
"frames": [_frame_to_dict(f) for f in a5_frames],
}
with a5_path.open("wb") as fp:
pickle.dump(payload, fp, protocol=pickle.HIGHEST_PROTOCOL)
# 3. write the .h5 clean-waveform file (samples in physical units).
# Best-effort: a write failure shouldn't sink the rest of the save
# (the HDF5 can be regenerated later from the .a5.pkl).
hdf5_filename: Optional[str] = None
try:
event_hdf5.write_event_hdf5(
hdf5_path, ev,
serial=serial,
geo_range=geo_range,
source_kind=source_kind,
)
hdf5_filename = hdf5_path.name
except Exception as exc:
log.warning(
"save: HDF5 write failed for %s: %s — continuing without .h5",
hdf5_path, exc,
)
# 4. write the .sfm.json sidecar. Preserve any existing review
# block + extensions across re-saves so user edits aren't lost
# when the same event is re-downloaded (e.g. via Force refresh).
existing_review = None
existing_extensions = None
if sidecar_path.exists():
try:
old = event_file_io.read_sidecar(sidecar_path)
existing_review = old.get("review")
existing_extensions = old.get("extensions")
except Exception as exc:
log.warning(
"save: existing sidecar at %s unreadable (%s); overwriting",
sidecar_path, exc,
)
sidecar = event_file_io.event_to_sidecar_dict(
ev,
serial=serial,
blastware_filename=filename,
blastware_filesize=filesize,
blastware_sha256=sha256,
source_kind=source_kind,
a5_pickle_filename=a5_path.name,
review=existing_review,
extensions=existing_extensions,
)
event_file_io.write_sidecar(sidecar_path, sidecar)
log.info(
"WaveformStore.save serial=%s filename=%s filesize=%d frames=%d "
"h5=%s sidecar=%s",
serial, filename, filesize, len(a5_frames),
hdf5_filename or "(skipped)", sidecar_path.name,
)
return {
"filename": filename,
"filesize": filesize,
"sha256": sha256,
"a5_pickle_filename": a5_path.name,
"hdf5_filename": hdf5_filename,
"sidecar_filename": sidecar_path.name,
}
def save_imported_bw(
self,
bw_bytes: bytes,
source_path: Path,
*,
serial_hint: Optional[str] = None,
bw_report_text: Optional[Union[str, bytes]] = None,
) -> tuple[Event, dict]:
"""
Ingest a Blastware event file produced by an external tool
(Blastware's own ACH, manual download, etc.) where the source A5
frames aren't available.
Workflow:
1. Parse the bytes via event_file_io.read_blastware_file (writes
a temp file to do that, since the parser takes a path).
2. Optionally parse a paired BW ASCII event report (the .TXT
file BW writes alongside the binary). When supplied, its
decoded fields land in the sidecar's `bw_report` block AND
overlay the device-authoritative peak values into the
top-level `peak_values` block. This is the right path for
the ACH-forwarder daemon use case where Blastware's own
ACH writes both files into the watch folder.
3. Resolve serial from BW filename (`<P><serial3>...`) or use
serial_hint. Falls back to "UNKNOWN".
4. Copy the BW bytes verbatim into <root>/<serial>/<filename>.
5. Write the .sfm.json sidecar with source.kind = "bw-import"
and a5_pickle_filename = None. Does NOT write a .a5.pkl
(no A5 source available; byte-for-byte regeneration not
possible — the on-disk BW file IS the byte-for-byte source).
Returns (event, record_dict) so callers can both insert into
SeismoDb and surface the parsed Event.
"""
# Stash the bytes to a temp path so read_blastware_file (path-based)
# can parse without us duplicating its logic.
import tempfile
with tempfile.NamedTemporaryFile(suffix=".bw", delete=False) as tmp:
tmp.write(bw_bytes)
tmp_path = Path(tmp.name)
try:
ev = event_file_io.read_blastware_file(tmp_path)
finally:
try:
tmp_path.unlink()
except FileNotFoundError:
pass
# read_blastware_file derives record_type from its path arg, but
# that arg is the tmp file (suffix ".bw") — so override with the
# original filename's encoded type (H/W/M/E/C in the BW AB0T
# scheme). Without this override every BW-imported event lands
# in the DB with record_type="Waveform" regardless of the actual
# type (Histogram, Manual, etc.).
ev.record_type = event_file_io.derive_record_type_from_filename(
source_path.name
)
# Parse the BW ASCII report if one was supplied. Failures here
# are non-fatal: we still write the binary + sidecar without the
# rich derived fields.
bw_report = None
if bw_report_text is not None:
try:
from minimateplus.bw_ascii_report import parse_report
bw_report = parse_report(bw_report_text)
except Exception as exc:
log.warning(
"save_imported_bw: BW report parse failed: %s — continuing without it",
exc,
)
# If we have a report, overlay its device-authoritative fields
# (peaks, project, sample_rate, record_time) onto the Event
# BEFORE handing it to db.insert_events(). Without this overlay
# the DB row gets `peak_values` from _peaks_from_samples(), which
# runs the still-undecoded waveform codec on the BW body and
# produces ±10 in/s saturation values on every channel for every
# event. The sidecar JSON had the correct values via
# event_to_sidecar_dict(bw_report=...) but the DB columns didn't.
if bw_report is not None:
try:
event_file_io.apply_report_to_event(ev, bw_report)
except Exception as exc:
log.warning(
"save_imported_bw: failed to overlay report onto event: %s",
exc,
)
# Resolve serial. blastware_filename derives a 4-char prefix from
# the numeric serial (e.g. BE11529 → M529); we go the other way
# via the source filename if a hint wasn't given.
serial = serial_hint or _serial_from_bw_filename(source_path.name) or "UNKNOWN"
# Use the source filename verbatim — it already encodes timestamp
# + record type per BW's AB0T scheme, and we want to preserve it
# so the file BW knows about can be opened back in BW.
filename = source_path.name
bw_path = self._serial_dir(serial) / filename
# 1. copy bytes
bw_path.write_bytes(bw_bytes)
filesize = bw_path.stat().st_size
sha256 = event_file_io.file_sha256(bw_path)
# 1b. preserve the raw BW ASCII report (.TXT) alongside the binary.
# Saved at <root>/<serial>/<filename>_ASCII.TXT. Lets us re-parse
# offline after parser fixes without needing to re-forward from
# the watcher PC. Negligible storage cost (~15 KB per event).
# Skipped silently when no report was supplied (live download path,
# manual upload without paired TXT).
txt_filename: Optional[str] = None
if bw_report_text is not None:
try:
txt_path = self.txt_path_for(serial, filename)
if isinstance(bw_report_text, bytes):
txt_path.write_bytes(bw_report_text)
else:
txt_path.write_text(bw_report_text)
txt_filename = txt_path.name
except Exception as exc:
log.warning(
"save_imported_bw: failed to save TXT for %s: %s"
"continuing without it",
filename, exc,
)
# 2. write the .h5 clean-waveform file from the parsed Event.
# Note: peaks here are computed from raw samples (the BW file
# doesn't carry the device-authoritative 0C peaks). Best-effort.
hdf5_path = self.hdf5_path_for(serial, filename)
hdf5_filename: Optional[str] = None
try:
event_hdf5.write_event_hdf5(
hdf5_path, ev,
serial=serial,
geo_range="normal", # BW file doesn't carry the range; assume Normal
source_kind="bw-import",
)
hdf5_filename = hdf5_path.name
except Exception as exc:
log.warning(
"save_imported_bw: HDF5 write failed for %s: %s — continuing",
hdf5_path, exc,
)
# 3. write sidecar with source.kind = bw-import
sidecar_path = self.sidecar_path_for(serial, filename)
existing_review = None
if sidecar_path.exists():
try:
existing_review = event_file_io.read_sidecar(sidecar_path).get("review")
except Exception:
pass
sidecar = event_file_io.event_to_sidecar_dict(
ev,
serial=serial,
blastware_filename=filename,
blastware_filesize=filesize,
blastware_sha256=sha256,
source_kind="bw-import",
a5_pickle_filename=None,
txt_filename=txt_filename,
review=existing_review,
bw_report=bw_report,
)
event_file_io.write_sidecar(sidecar_path, sidecar)
log.info(
"WaveformStore.save_imported_bw serial=%s filename=%s filesize=%d "
"h5=%s (no .a5.pkl — A5 source unavailable for BW-imported files)",
serial, filename, filesize, hdf5_filename or "(skipped)",
)
return ev, {
"filename": filename,
"filesize": filesize,
"sha256": sha256,
"a5_pickle_filename": None,
"hdf5_filename": hdf5_filename,
"sidecar_filename": sidecar_path.name,
"serial": serial,
}
def save_imported_idf(
self,
idf_bytes: bytes,
source_path: Path,
*,
serial_hint: Optional[str] = None,
idf_report_text: Optional[Union[str, bytes]] = None,
) -> tuple[Optional["Event"], dict]:
"""
Ingest a Thor (Micromate Series IV) IDF event file (`.IDFW` or
`.IDFH`) produced by Thor's TXT exporter.
Thor binaries are stored as opaque bytes — seismo-relay doesn't
yet decode the proprietary IDF binary format (codec slot lives
at ``micromate/idf_file.py``). Device-authoritative metadata
comes from the paired ``.IDFW.txt`` / ``.IDFH.txt`` sidecar
when supplied.
Workflow:
1. Parse the paired TXT report (when supplied) via
``micromate.parse_idf_report`` → dict.
2. Wrap parsed dict + filename into a typed ``micromate.IdfEvent``.
3. Copy bytes verbatim into ``<root>/<serial>/<filename>``.
4. Bridge IdfEvent → ``minimateplus.Event`` (for the existing
sidecar / DB insert machinery) via
``IdfEvent.to_minimateplus_event(waveform_key)``.
5. Write the ``.sfm.json`` sidecar with
``source.kind = "idf-import"`` and the full raw IDF report
under ``extensions.idf_report``.
Returns ``(event, record_dict)`` so the endpoint can both insert
into SeismoDb and surface the parsed event.
"""
from micromate import IdfEvent, parse_idf_report
# Parse the .txt sidecar (best-effort; non-fatal on failure).
report_dict: dict = {}
if idf_report_text is not None:
try:
report_dict = parse_idf_report(idf_report_text)
except Exception as exc:
log.warning(
"save_imported_idf: report parse failed: %s — continuing without it",
exc,
)
# Build the typed IdfEvent. Filename is authoritative for
# (serial, timestamp, kind); the report's event_datetime takes
# precedence over the filename timestamp inside from_report().
idf_event = IdfEvent.from_report(report_dict, source_path.name)
# Operator-supplied serial_hint wins over the binary's filename
# prefix when both are present (e.g. callers passing a known-good
# serial that overrides a misnamed export).
serial = serial_hint or idf_event.serial or "UNKNOWN"
# Filesystem write.
filename = source_path.name
bw_path = self._serial_dir(serial) / filename
bw_path.write_bytes(idf_bytes)
filesize = bw_path.stat().st_size
sha256 = event_file_io.file_sha256(bw_path)
# _waveform_key dedups (serial, timestamp) rows in the events
# table. Use the binary's sha256 (first 16 bytes) as a stable
# surrogate — every distinct binary maps to a distinct row.
waveform_key = bytes.fromhex(sha256)[:16]
# Bridge to minimateplus.Event for the existing sidecar / DB
# insert paths. See IdfEvent.to_minimateplus_event() for the
# caveats of this bridge (mic units, missing fields → sidecar).
ev = idf_event.to_minimateplus_event(waveform_key)
# Write the sidecar. Source kind "idf-import" was added to the
# allow-list in event_file_io.event_to_sidecar_dict for this.
sidecar_path = self.sidecar_path_for(serial, filename)
existing_review = None
if sidecar_path.exists():
try:
existing_review = event_file_io.read_sidecar(sidecar_path).get("review")
except Exception:
pass
sidecar = event_file_io.event_to_sidecar_dict(
ev,
serial=serial,
blastware_filename=filename,
blastware_filesize=filesize,
blastware_sha256=sha256,
source_kind="idf-import",
a5_pickle_filename=None,
review=existing_review,
)
# Stash the full parsed IDF report under extensions so downstream
# consumers can recover the rich derived fields that don't fit
# the BW-shaped event model (Peak Acceleration / Displacement,
# Time of Peak, sensor self-check, calibration, firmware).
if report_dict:
sidecar["extensions"]["idf_report"] = report_dict
event_file_io.write_sidecar(sidecar_path, sidecar)
log.info(
"WaveformStore.save_imported_idf serial=%s filename=%s filesize=%d "
"report_attached=%s",
serial, filename, filesize, bool(report_dict),
)
return ev, {
"filename": filename,
"filesize": filesize,
"sha256": sha256,
"a5_pickle_filename": None,
"hdf5_filename": None,
"sidecar_filename": sidecar_path.name,
"serial": serial,
}
def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]:
"""
Re-hydrate the pickled A5 frame stream for a stored event.
Returns None if the sidecar is missing.
"""
_, a5_path = self.paths_for(serial, filename)
if not a5_path.exists():
return None
with a5_path.open("rb") as fp:
payload = pickle.load(fp)
if not isinstance(payload, dict) or "frames" not in payload:
log.warning("WaveformStore.load_a5: malformed sidecar at %s", a5_path)
return None
return [_dict_to_frame(d) for d in payload["frames"]]
# ── modern .sfm.json sidecar accessors ──────────────────────────────────────
def load_sidecar(self, serial: str, filename: str) -> Optional[dict]:
"""Return the parsed .sfm.json sidecar dict, or None if missing."""
path = self.sidecar_path_for(serial, filename)
if not path.exists():
return None
try:
return event_file_io.read_sidecar(path)
except Exception as exc:
log.warning("load_sidecar: failed to read %s: %s", path, exc)
return None
def patch_sidecar(
self,
serial: str,
filename: str,
*,
review: Optional[dict] = None,
extensions: Optional[dict] = None,
reviewer_now: bool = True,
) -> Optional[dict]:
"""
JSON-merge-patch the .sfm.json sidecar's review/extensions blocks.
Returns the new full dict, or None if the sidecar doesn't exist.
"""
path = self.sidecar_path_for(serial, filename)
if not path.exists():
return None
return event_file_io.patch_sidecar(
path,
review=review,
extensions=extensions,
reviewer_now=reviewer_now,
)
# ── helpers ─────────────────────────────────────────────────────────────────────
def _serial_from_bw_filename(name: str) -> Optional[str]:
"""
Reverse of `blastware_filename`'s serial-prefix encoding.
BW filename format (V10.72): `<P><serial3><stem4>.<ext>`
where P = chr(ord('B') + floor(serial // 1000))
and serial3 = f"{serial % 1000:03d}".
Examples (from CLAUDE.md verification archive):
P036... → BE14036 H907... → BE6907
M529... → BE11529 T003... → BE18003
Returns the inferred BE-prefix serial (e.g. "BE11529") or None when
the filename doesn't match the expected pattern.
"""
if not name:
return None
# First letter encodes the thousands group; next 3 chars encode the
# last 3 digits of the serial.
base = name.split(".", 1)[0]
if len(base) < 4 or not base[0].isalpha() or not base[1:4].isdigit():
return None
prefix_letter = base[0].upper()
if prefix_letter < "B":
return None
thousands = ord(prefix_letter) - ord("B")
serial_num = thousands * 1000 + int(base[1:4])
return f"BE{serial_num}"