aac1c8e06d
The BW ACH ingest path was inserting every event with
record_type="Waveform" regardless of the actual type because
read_blastware_file() had `ev.record_type = "Waveform"` hardcoded, and
the live watcher-forward path parses files from a tmp path (suffix
".bw") that doesn't carry the original extension.
V10.72+ MiniMate Plus firmware encodes the event type as the last
character of the AB0T extension scheme (H=Histogram, W=Waveform,
M=Manual, E=Event, C=Combo). This change:
1. Adds derive_record_type_from_filename() public helper in
minimateplus/event_file_io.py
2. Uses it inside read_blastware_file() so direct callers (the
--dry-run path of scripts/import_bw.py, tests, ad-hoc scripts)
get correct types automatically
3. Overrides ev.record_type in WaveformStore.save_imported_bw()
using the ORIGINAL filename (source_path.name) — required
because the parser sees only the tmp file
Old S338 firmware (3-char extensions ending in `0`) and any
unrecognized suffix fall back to "Waveform".
Existing DB rows ingested before this fix are stuck with
record_type="Waveform" — a one-off SQL backfill would fix them
retroactively if desired. Terra-view's event modal also derives
client-side from the filename, so the UI already shows the correct
type for old events even without the backfill.
Version bumped to 0.16.1 in pyproject.toml, event_file_io.py
TOOL_VERSION, sfm/server.py FastAPI version, and CHANGELOG.md.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
498 lines
20 KiB
Python
498 lines
20 KiB
Python
"""
|
|
sfm/waveform_store.py — On-disk store for Blastware-format event files.
|
|
|
|
Layout (flat per-serial, four files per event):
|
|
|
|
<root>/<serial>/<filename> ← event file (BW-readable binary)
|
|
<root>/<serial>/<filename>.a5.pkl ← pickled list of A5 S3Frame dicts
|
|
<root>/<serial>/<filename>.h5 ← clean waveform arrays (HDF5)
|
|
<root>/<serial>/<filename>.sfm.json ← modern sidecar (peaks, project,
|
|
review state, extensions)
|
|
|
|
`<filename>` is whatever `minimateplus.blastware_file.blastware_filename`
|
|
produces for the event. The extension is NOT a fixed type tag — it
|
|
encodes the event timestamp (`AB0T` format).
|
|
|
|
Roles:
|
|
- BW binary: what Blastware reads. Untouched. The user-facing review
|
|
waveform viewer.
|
|
- .a5.pkl: regenerative source. Lets the BW binary be rebuilt
|
|
byte-for-byte if the encoder changes. Never delete.
|
|
- .h5: clean per-channel waveform arrays in physical units (in/s for
|
|
geo, psi for mic) plus event metadata. Canonical format for
|
|
downstream analysis tools and the `/device/event/{idx}/waveform`
|
|
endpoint's plot-JSON output.
|
|
- .sfm.json: small, queryable metadata + review state. SQL
|
|
`events.false_trigger` is a derived index kept in sync via
|
|
`patch_sidecar()`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
import pickle
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
from minimateplus import event_file_io
|
|
from minimateplus.blastware_file import blastware_filename, write_blastware_file
|
|
from minimateplus.framing import S3Frame
|
|
from minimateplus.models import Event
|
|
from sfm import event_hdf5
|
|
|
|
log = logging.getLogger("sfm.waveform_store")
|
|
|
|
A5_PICKLE_VERSION = 1
|
|
|
|
|
|
def _frame_to_dict(f: S3Frame) -> dict:
|
|
return {
|
|
"sub": f.sub,
|
|
"page_hi": f.page_hi,
|
|
"page_lo": f.page_lo,
|
|
"data": bytes(f.data),
|
|
"chk_byte": f.chk_byte,
|
|
"checksum_valid": f.checksum_valid,
|
|
}
|
|
|
|
|
|
def _dict_to_frame(d: dict) -> S3Frame:
|
|
return S3Frame(
|
|
sub=d["sub"],
|
|
page_hi=d["page_hi"],
|
|
page_lo=d["page_lo"],
|
|
data=bytes(d["data"]),
|
|
checksum_valid=d.get("checksum_valid", True),
|
|
chk_byte=d.get("chk_byte", 0),
|
|
)
|
|
|
|
|
|
class WaveformStore:
|
|
"""
|
|
Persistent store for Blastware-format waveform files + their A5 source frames.
|
|
|
|
Thread safety: write_blastware_file is single-shot; concurrent saves of the
|
|
*same* filename would race, but the filename encodes second-resolution
|
|
timestamps + serial, so collisions across threads/processes are vanishingly
|
|
unlikely in practice.
|
|
"""
|
|
|
|
def __init__(self, root: str | Path) -> None:
|
|
self.root = Path(root)
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
log.info("WaveformStore root=%s", self.root)
|
|
|
|
# ── path helpers ────────────────────────────────────────────────────────────
|
|
|
|
def _serial_dir(self, serial: str) -> Path:
|
|
d = self.root / serial
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
def paths_for(self, serial: str, filename: str) -> tuple[Path, Path]:
|
|
"""Return (blastware_path, a5_pickle_path) for a given serial+filename.
|
|
|
|
For the sidecar path use `sidecar_path_for()` — kept separate so
|
|
existing callers don't need to unpack a 3-tuple.
|
|
"""
|
|
d = self._serial_dir(serial)
|
|
return d / filename, d / f"{filename}.a5.pkl"
|
|
|
|
def sidecar_path_for(self, serial: str, filename: str) -> Path:
|
|
"""Return absolute path to the .sfm.json sidecar for a given event."""
|
|
return self._serial_dir(serial) / f"{filename}.sfm.json"
|
|
|
|
def hdf5_path_for(self, serial: str, filename: str) -> Path:
|
|
"""Return absolute path to the .h5 clean-waveform file for a given event."""
|
|
return self._serial_dir(serial) / f"{filename}.h5"
|
|
|
|
def open_blastware(self, serial: str, filename: str) -> Optional[Path]:
|
|
"""Return absolute path to an existing event file or None."""
|
|
bw_path, _ = self.paths_for(serial, filename)
|
|
return bw_path if bw_path.exists() else None
|
|
|
|
# ── save / load ─────────────────────────────────────────────────────────────
|
|
|
|
def save(
|
|
self,
|
|
ev: Event,
|
|
serial: str,
|
|
a5_frames: list[S3Frame],
|
|
*,
|
|
source_kind: str = "sfm-live",
|
|
geo_range = "normal",
|
|
) -> dict:
|
|
"""
|
|
Write all four event-file artifacts for one event:
|
|
- <filename> BW binary
|
|
- <filename>.a5.pkl raw A5 frame pickle
|
|
- <filename>.h5 clean waveform (HDF5)
|
|
- <filename>.sfm.json modern sidecar (metadata + review)
|
|
|
|
Returns a record dict suitable for persisting alongside the DB row:
|
|
|
|
{
|
|
"filename": "M529LKIQ.7M0W",
|
|
"filesize": 8708,
|
|
"sha256": "a1b2c3...",
|
|
"a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
|
|
"hdf5_filename": "M529LKIQ.7M0W.h5",
|
|
"sidecar_filename": "M529LKIQ.7M0W.sfm.json",
|
|
}
|
|
|
|
`source_kind` flows into `sidecar.source.kind` — callers should
|
|
pass "sfm-live" (default) for the live endpoint and "sfm-ach" for
|
|
the ACH ingestion path. BW-imported events use save_imported_bw()
|
|
instead.
|
|
|
|
`geo_range` controls the ADC-counts → in/s scaling in the HDF5
|
|
file ("normal" = 10 in/s FS, "sensitive" = 1.25 in/s FS).
|
|
Defaults to "normal" — callers with compliance-config access
|
|
should pass the actual unit setting so the saved samples are in
|
|
the right units.
|
|
|
|
Idempotent: if the event file already exists, it is overwritten
|
|
with the freshly-encoded version (same bytes for the same
|
|
a5_frames) and the sidecar's review block is preserved across
|
|
re-saves.
|
|
"""
|
|
if not a5_frames:
|
|
raise ValueError("WaveformStore.save: a5_frames is empty")
|
|
if not serial:
|
|
raise ValueError("WaveformStore.save: serial is required")
|
|
|
|
filename = blastware_filename(ev, serial)
|
|
bw_path, a5_path = self.paths_for(serial, filename)
|
|
sidecar_path = self.sidecar_path_for(serial, filename)
|
|
hdf5_path = self.hdf5_path_for(serial, filename)
|
|
|
|
# 1. encode the event file (defensive unlink prevents trailing-byte
|
|
# leaks from a previous larger file on synced/odd filesystems).
|
|
try:
|
|
bw_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
write_blastware_file(ev, a5_frames, bw_path)
|
|
filesize = bw_path.stat().st_size
|
|
sha256 = event_file_io.file_sha256(bw_path)
|
|
|
|
# 2. write the .a5.pkl sidecar
|
|
try:
|
|
a5_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
payload = {
|
|
"version": A5_PICKLE_VERSION,
|
|
"frames": [_frame_to_dict(f) for f in a5_frames],
|
|
}
|
|
with a5_path.open("wb") as fp:
|
|
pickle.dump(payload, fp, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
# 3. write the .h5 clean-waveform file (samples in physical units).
|
|
# Best-effort: a write failure shouldn't sink the rest of the save
|
|
# (the HDF5 can be regenerated later from the .a5.pkl).
|
|
hdf5_filename: Optional[str] = None
|
|
try:
|
|
event_hdf5.write_event_hdf5(
|
|
hdf5_path, ev,
|
|
serial=serial,
|
|
geo_range=geo_range,
|
|
source_kind=source_kind,
|
|
)
|
|
hdf5_filename = hdf5_path.name
|
|
except Exception as exc:
|
|
log.warning(
|
|
"save: HDF5 write failed for %s: %s — continuing without .h5",
|
|
hdf5_path, exc,
|
|
)
|
|
|
|
# 4. write the .sfm.json sidecar. Preserve any existing review
|
|
# block + extensions across re-saves so user edits aren't lost
|
|
# when the same event is re-downloaded (e.g. via Force refresh).
|
|
existing_review = None
|
|
existing_extensions = None
|
|
if sidecar_path.exists():
|
|
try:
|
|
old = event_file_io.read_sidecar(sidecar_path)
|
|
existing_review = old.get("review")
|
|
existing_extensions = old.get("extensions")
|
|
except Exception as exc:
|
|
log.warning(
|
|
"save: existing sidecar at %s unreadable (%s); overwriting",
|
|
sidecar_path, exc,
|
|
)
|
|
|
|
sidecar = event_file_io.event_to_sidecar_dict(
|
|
ev,
|
|
serial=serial,
|
|
blastware_filename=filename,
|
|
blastware_filesize=filesize,
|
|
blastware_sha256=sha256,
|
|
source_kind=source_kind,
|
|
a5_pickle_filename=a5_path.name,
|
|
review=existing_review,
|
|
extensions=existing_extensions,
|
|
)
|
|
event_file_io.write_sidecar(sidecar_path, sidecar)
|
|
|
|
log.info(
|
|
"WaveformStore.save serial=%s filename=%s filesize=%d frames=%d "
|
|
"h5=%s sidecar=%s",
|
|
serial, filename, filesize, len(a5_frames),
|
|
hdf5_filename or "(skipped)", sidecar_path.name,
|
|
)
|
|
return {
|
|
"filename": filename,
|
|
"filesize": filesize,
|
|
"sha256": sha256,
|
|
"a5_pickle_filename": a5_path.name,
|
|
"hdf5_filename": hdf5_filename,
|
|
"sidecar_filename": sidecar_path.name,
|
|
}
|
|
|
|
def save_imported_bw(
|
|
self,
|
|
bw_bytes: bytes,
|
|
source_path: Path,
|
|
*,
|
|
serial_hint: Optional[str] = None,
|
|
bw_report_text: Optional[Union[str, bytes]] = None,
|
|
) -> tuple[Event, dict]:
|
|
"""
|
|
Ingest a Blastware event file produced by an external tool
|
|
(Blastware's own ACH, manual download, etc.) where the source A5
|
|
frames aren't available.
|
|
|
|
Workflow:
|
|
1. Parse the bytes via event_file_io.read_blastware_file (writes
|
|
a temp file to do that, since the parser takes a path).
|
|
2. Optionally parse a paired BW ASCII event report (the .TXT
|
|
file BW writes alongside the binary). When supplied, its
|
|
decoded fields land in the sidecar's `bw_report` block AND
|
|
overlay the device-authoritative peak values into the
|
|
top-level `peak_values` block. This is the right path for
|
|
the ACH-forwarder daemon use case where Blastware's own
|
|
ACH writes both files into the watch folder.
|
|
3. Resolve serial from BW filename (`<P><serial3>...`) or use
|
|
serial_hint. Falls back to "UNKNOWN".
|
|
4. Copy the BW bytes verbatim into <root>/<serial>/<filename>.
|
|
5. Write the .sfm.json sidecar with source.kind = "bw-import"
|
|
and a5_pickle_filename = None. Does NOT write a .a5.pkl
|
|
(no A5 source available; byte-for-byte regeneration not
|
|
possible — the on-disk BW file IS the byte-for-byte source).
|
|
|
|
Returns (event, record_dict) so callers can both insert into
|
|
SeismoDb and surface the parsed Event.
|
|
"""
|
|
# Stash the bytes to a temp path so read_blastware_file (path-based)
|
|
# can parse without us duplicating its logic.
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix=".bw", delete=False) as tmp:
|
|
tmp.write(bw_bytes)
|
|
tmp_path = Path(tmp.name)
|
|
try:
|
|
ev = event_file_io.read_blastware_file(tmp_path)
|
|
finally:
|
|
try:
|
|
tmp_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
# read_blastware_file derives record_type from its path arg, but
|
|
# that arg is the tmp file (suffix ".bw") — so override with the
|
|
# original filename's encoded type (H/W/M/E/C in the BW AB0T
|
|
# scheme). Without this override every BW-imported event lands
|
|
# in the DB with record_type="Waveform" regardless of the actual
|
|
# type (Histogram, Manual, etc.).
|
|
ev.record_type = event_file_io.derive_record_type_from_filename(
|
|
source_path.name
|
|
)
|
|
|
|
# Parse the BW ASCII report if one was supplied. Failures here
|
|
# are non-fatal: we still write the binary + sidecar without the
|
|
# rich derived fields.
|
|
bw_report = None
|
|
if bw_report_text is not None:
|
|
try:
|
|
from minimateplus.bw_ascii_report import parse_report
|
|
bw_report = parse_report(bw_report_text)
|
|
except Exception as exc:
|
|
log.warning(
|
|
"save_imported_bw: BW report parse failed: %s — continuing without it",
|
|
exc,
|
|
)
|
|
|
|
# If we have a report, overlay its device-authoritative fields
|
|
# (peaks, project, sample_rate, record_time) onto the Event
|
|
# BEFORE handing it to db.insert_events(). Without this overlay
|
|
# the DB row gets `peak_values` from _peaks_from_samples(), which
|
|
# runs the still-undecoded waveform codec on the BW body and
|
|
# produces ±10 in/s saturation values on every channel for every
|
|
# event. The sidecar JSON had the correct values via
|
|
# event_to_sidecar_dict(bw_report=...) but the DB columns didn't.
|
|
if bw_report is not None:
|
|
try:
|
|
event_file_io.apply_report_to_event(ev, bw_report)
|
|
except Exception as exc:
|
|
log.warning(
|
|
"save_imported_bw: failed to overlay report onto event: %s",
|
|
exc,
|
|
)
|
|
|
|
# Resolve serial. blastware_filename derives a 4-char prefix from
|
|
# the numeric serial (e.g. BE11529 → M529); we go the other way
|
|
# via the source filename if a hint wasn't given.
|
|
serial = serial_hint or _serial_from_bw_filename(source_path.name) or "UNKNOWN"
|
|
|
|
# Use the source filename verbatim — it already encodes timestamp
|
|
# + record type per BW's AB0T scheme, and we want to preserve it
|
|
# so the file BW knows about can be opened back in BW.
|
|
filename = source_path.name
|
|
bw_path = self._serial_dir(serial) / filename
|
|
|
|
# 1. copy bytes
|
|
bw_path.write_bytes(bw_bytes)
|
|
filesize = bw_path.stat().st_size
|
|
sha256 = event_file_io.file_sha256(bw_path)
|
|
|
|
# 2. write the .h5 clean-waveform file from the parsed Event.
|
|
# Note: peaks here are computed from raw samples (the BW file
|
|
# doesn't carry the device-authoritative 0C peaks). Best-effort.
|
|
hdf5_path = self.hdf5_path_for(serial, filename)
|
|
hdf5_filename: Optional[str] = None
|
|
try:
|
|
event_hdf5.write_event_hdf5(
|
|
hdf5_path, ev,
|
|
serial=serial,
|
|
geo_range="normal", # BW file doesn't carry the range; assume Normal
|
|
source_kind="bw-import",
|
|
)
|
|
hdf5_filename = hdf5_path.name
|
|
except Exception as exc:
|
|
log.warning(
|
|
"save_imported_bw: HDF5 write failed for %s: %s — continuing",
|
|
hdf5_path, exc,
|
|
)
|
|
|
|
# 3. write sidecar with source.kind = bw-import
|
|
sidecar_path = self.sidecar_path_for(serial, filename)
|
|
existing_review = None
|
|
if sidecar_path.exists():
|
|
try:
|
|
existing_review = event_file_io.read_sidecar(sidecar_path).get("review")
|
|
except Exception:
|
|
pass
|
|
|
|
sidecar = event_file_io.event_to_sidecar_dict(
|
|
ev,
|
|
serial=serial,
|
|
blastware_filename=filename,
|
|
blastware_filesize=filesize,
|
|
blastware_sha256=sha256,
|
|
source_kind="bw-import",
|
|
a5_pickle_filename=None,
|
|
review=existing_review,
|
|
bw_report=bw_report,
|
|
)
|
|
event_file_io.write_sidecar(sidecar_path, sidecar)
|
|
|
|
log.info(
|
|
"WaveformStore.save_imported_bw serial=%s filename=%s filesize=%d "
|
|
"h5=%s (no .a5.pkl — A5 source unavailable for BW-imported files)",
|
|
serial, filename, filesize, hdf5_filename or "(skipped)",
|
|
)
|
|
return ev, {
|
|
"filename": filename,
|
|
"filesize": filesize,
|
|
"sha256": sha256,
|
|
"a5_pickle_filename": None,
|
|
"hdf5_filename": hdf5_filename,
|
|
"sidecar_filename": sidecar_path.name,
|
|
"serial": serial,
|
|
}
|
|
|
|
def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]:
|
|
"""
|
|
Re-hydrate the pickled A5 frame stream for a stored event.
|
|
Returns None if the sidecar is missing.
|
|
"""
|
|
_, a5_path = self.paths_for(serial, filename)
|
|
if not a5_path.exists():
|
|
return None
|
|
with a5_path.open("rb") as fp:
|
|
payload = pickle.load(fp)
|
|
if not isinstance(payload, dict) or "frames" not in payload:
|
|
log.warning("WaveformStore.load_a5: malformed sidecar at %s", a5_path)
|
|
return None
|
|
return [_dict_to_frame(d) for d in payload["frames"]]
|
|
|
|
# ── modern .sfm.json sidecar accessors ──────────────────────────────────────
|
|
|
|
def load_sidecar(self, serial: str, filename: str) -> Optional[dict]:
|
|
"""Return the parsed .sfm.json sidecar dict, or None if missing."""
|
|
path = self.sidecar_path_for(serial, filename)
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return event_file_io.read_sidecar(path)
|
|
except Exception as exc:
|
|
log.warning("load_sidecar: failed to read %s: %s", path, exc)
|
|
return None
|
|
|
|
def patch_sidecar(
|
|
self,
|
|
serial: str,
|
|
filename: str,
|
|
*,
|
|
review: Optional[dict] = None,
|
|
extensions: Optional[dict] = None,
|
|
reviewer_now: bool = True,
|
|
) -> Optional[dict]:
|
|
"""
|
|
JSON-merge-patch the .sfm.json sidecar's review/extensions blocks.
|
|
Returns the new full dict, or None if the sidecar doesn't exist.
|
|
"""
|
|
path = self.sidecar_path_for(serial, filename)
|
|
if not path.exists():
|
|
return None
|
|
return event_file_io.patch_sidecar(
|
|
path,
|
|
review=review,
|
|
extensions=extensions,
|
|
reviewer_now=reviewer_now,
|
|
)
|
|
|
|
|
|
# ── helpers ─────────────────────────────────────────────────────────────────────
|
|
|
|
def _serial_from_bw_filename(name: str) -> Optional[str]:
|
|
"""
|
|
Reverse of `blastware_filename`'s serial-prefix encoding.
|
|
|
|
BW filename format (V10.72): `<P><serial3><stem4>.<ext>`
|
|
where P = chr(ord('B') + floor(serial // 1000))
|
|
and serial3 = f"{serial % 1000:03d}".
|
|
|
|
Examples (from CLAUDE.md verification archive):
|
|
P036... → BE14036 H907... → BE6907
|
|
M529... → BE11529 T003... → BE18003
|
|
|
|
Returns the inferred BE-prefix serial (e.g. "BE11529") or None when
|
|
the filename doesn't match the expected pattern.
|
|
"""
|
|
if not name:
|
|
return None
|
|
# First letter encodes the thousands group; next 3 chars encode the
|
|
# last 3 digits of the serial.
|
|
base = name.split(".", 1)[0]
|
|
if len(base) < 4 or not base[0].isalpha() or not base[1:4].isdigit():
|
|
return None
|
|
prefix_letter = base[0].upper()
|
|
if prefix_letter < "B":
|
|
return None
|
|
thousands = ord(prefix_letter) - ord("B")
|
|
serial_num = thousands * 1000 + int(base[1:4])
|
|
return f"BE{serial_num}"
|