feat: add waveform store handling #16

Merged
serversdown merged 5 commits from sfm-waveform-store into main 2026-05-08 15:03:33 -04:00
5 changed files with 42 additions and 33 deletions
Showing only changes of commit 0484680c89 - Show all commits
+2 -2
View File
@@ -410,9 +410,9 @@ class AchSession:
if skipped: if skipped:
log.info(" (skipped %d already-downloaded event(s))", skipped) log.info(" (skipped %d already-downloaded event(s))", skipped)
# ── Persist .G10 / A5 sidecars to the waveform store ── # ── Persist event file + A5 sidecar to the waveform store ──
# Saves ride alongside the existing JSON dump so the on-disk # Saves ride alongside the existing JSON dump so the on-disk
# .G10 and the events.json reference the same set of events. # event file and events.json reference the same set of events.
waveform_records: dict[str, dict] = {} waveform_records: dict[str, dict] = {}
for ev in new_events: for ev in new_events:
if not ev._a5_frames: if not ev._a5_frames:
+2 -2
View File
@@ -81,8 +81,8 @@ CREATE TABLE IF NOT EXISTS events (
sample_rate INTEGER, sample_rate INTEGER,
record_type TEXT, -- "single_shot" | "continuous" record_type TEXT, -- "single_shot" | "continuous"
false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag)
blastware_filename TEXT, -- e.g. "M529LKIQ.G10" within waveform store blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp)
blastware_filesize INTEGER, -- bytes; NULL if no .G10 saved blastware_filesize INTEGER, -- bytes; NULL if no event file saved
a5_pickle_filename TEXT, -- "<filename>.a5.pkl" sidecar a5_pickle_filename TEXT, -- "<filename>.a5.pkl" sidecar
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp) UNIQUE(serial, timestamp)
+9 -7
View File
@@ -115,7 +115,7 @@ def _get_db() -> SeismoDb:
def _get_store() -> WaveformStore: def _get_store() -> WaveformStore:
""" """
Persistent .G10 + A5-sidecar store, rooted at <db_dir>/waveforms/. Persistent event-file + A5-sidecar store, rooted at <db_dir>/waveforms/.
Mirrors the layout used by bridges/ach_server.py so files saved by ACH Mirrors the layout used by bridges/ach_server.py so files saved by ACH
ingestion and by live SFM downloads share one canonical location. ingestion and by live SFM downloads share one canonical location.
""" """
@@ -1480,10 +1480,12 @@ def db_set_false_trigger(
@app.get("/db/events/{event_id}/blastware_file") @app.get("/db/events/{event_id}/blastware_file")
def db_event_blastware_file(event_id: str) -> FileResponse: def db_event_blastware_file(event_id: str) -> FileResponse:
""" """
Return the Blastware-format waveform file (.G10/.W/.H/etc.) for a Return the Blastware-format event file for a previously-ingested
previously-ingested event. 404 if the event is unknown or has no event. Filename extension is per-event (timestamp-encoded
.G10 in the store (events ingested before the store was wired will `AB0T` for ACH downloads, 3-char `AB0` for direct downloads).
show this — re-download via the live endpoint to populate). 404 if the event is unknown or has no event file in the store
(events ingested before the store was wired will show this —
re-download via the live endpoint to populate).
""" """
row = _get_db().get_event(event_id) row = _get_db().get_event(event_id)
if row is None: if row is None:
@@ -1584,8 +1586,8 @@ def db_unit_waveforms_zip(
limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"), limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"),
) -> StreamingResponse: ) -> StreamingResponse:
""" """
Stream a ZIP of all .G10/.W files for a serial in the optional date range. Stream a ZIP of all event files for a serial in the optional date range.
Events without a stored Blastware file are silently skipped. Events without a stored event file are silently skipped.
""" """
import io import io
import zipfile import zipfile
+21 -14
View File
@@ -1,18 +1,22 @@
""" """
sfm/waveform_store.py On-disk store for Blastware-format waveform files. sfm/waveform_store.py On-disk store for Blastware-format event files.
Layout (flat per-serial): Layout (flat per-serial):
<root>/<serial>/<filename> .G10 / .W / .H / etc. (Blastware-readable) <root>/<serial>/<filename> event file (Blastware-readable binary)
<root>/<serial>/<filename>.a5.pkl pickled list of A5 S3Frame dicts <root>/<serial>/<filename>.a5.pkl pickled list of A5 S3Frame dicts
`<filename>` is whatever `minimateplus.blastware_file.blastware_filename` `<filename>` is whatever `minimateplus.blastware_file.blastware_filename`
produces for the event (encodes serial + timestamp + record type). Filenames produces for the event. The extension is NOT a fixed type tag it encodes
never collide for the same physical event. the event timestamp (`AB0T` format: 2-char base-36 of `total_seconds %
1296`, literal `0`, then `W`=Full Waveform / `H`=Full Histogram for ACH
downloads, or 3-char `AB0` for direct/manual downloads). Every event's
filename therefore contains its own timestamp + record-type fingerprint and
collisions across the same physical event don't occur.
The `.a5.pkl` sidecar lets the .G10 be regenerated later if the encoder The `.a5.pkl` sidecar lets the event file be regenerated later if the
changes captures the raw 5A frame stream as serializable dicts so the encoder changes captures the raw 5A frame stream as serializable dicts so
schema isn't tied to the `S3Frame` dataclass layout. the schema isn't tied to the `S3Frame` dataclass layout.
""" """
from __future__ import annotations from __future__ import annotations
@@ -81,7 +85,7 @@ class WaveformStore:
return d / filename, d / f"{filename}.a5.pkl" return d / filename, d / f"{filename}.a5.pkl"
def open_blastware(self, serial: str, filename: str) -> Optional[Path]: def open_blastware(self, serial: str, filename: str) -> Optional[Path]:
"""Return absolute path to an existing .G10 file or None.""" """Return absolute path to an existing event file or None."""
bw_path, _ = self.paths_for(serial, filename) bw_path, _ = self.paths_for(serial, filename)
return bw_path if bw_path.exists() else None return bw_path if bw_path.exists() else None
@@ -94,18 +98,21 @@ class WaveformStore:
a5_frames: list[S3Frame], a5_frames: list[S3Frame],
) -> dict: ) -> dict:
""" """
Write the .G10 file and the .a5.pkl sidecar for one event. Write the event file and its .a5.pkl sidecar for one event.
Returns a record dict suitable for persisting alongside the DB row: Returns a record dict suitable for persisting alongside the DB row:
{ {
"filename": "M529LKIQ.G10", "filename": "M529LKIQ.7M0W",
"filesize": 8708, "filesize": 8708,
"a5_pickle_filename": "M529LKIQ.G10.a5.pkl", "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
} }
Idempotent: if the .G10 already exists, it is overwritten with the The exact extension is timestamp-encoded per event (see
freshly-encoded version (same bytes for the same a5_frames). `minimateplus.blastware_file.blastware_filename`).
Idempotent: if the event file already exists, it is overwritten with
the freshly-encoded version (same bytes for the same a5_frames).
""" """
if not a5_frames: if not a5_frames:
raise ValueError("WaveformStore.save: a5_frames is empty") raise ValueError("WaveformStore.save: a5_frames is empty")
@@ -115,7 +122,7 @@ class WaveformStore:
filename = blastware_filename(ev, serial) filename = blastware_filename(ev, serial)
bw_path, a5_path = self.paths_for(serial, filename) bw_path, a5_path = self.paths_for(serial, filename)
# 1. encode the .G10 # 1. encode the event file
# Delete any stale file at this path so partial writes never leak # Delete any stale file at this path so partial writes never leak
# trailing bytes from a previous larger file (matches the live # trailing bytes from a previous larger file (matches the live
# endpoint's defensive unlink). # endpoint's defensive unlink).
+7 -7
View File
@@ -139,12 +139,12 @@ def test_waveform_store_missing_returns_none(tmp_path: Path):
from sfm.waveform_store import WaveformStore from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms") store = WaveformStore(tmp_path / "waveforms")
assert store.open_blastware("BE99999", "no_such.G10") is None assert store.open_blastware("BE99999", "no_such.7M0W") is None
assert store.load_a5("BE99999", "no_such.G10") is None assert store.load_a5("BE99999", "no_such.7M0W") is None
def test_waveform_store_idempotent_save(tmp_path: Path): def test_waveform_store_idempotent_save(tmp_path: Path):
"""Saving the same event twice produces the same .G10 bytes.""" """Saving the same event twice produces the same event-file bytes."""
from sfm.waveform_store import WaveformStore from sfm.waveform_store import WaveformStore
store = WaveformStore(tmp_path / "waveforms") store = WaveformStore(tmp_path / "waveforms")
@@ -172,9 +172,9 @@ def test_seismodb_persists_waveform_columns(tmp_path: Path):
ev, _ = _make_synthetic_event() ev, _ = _make_synthetic_event()
rec = { rec = {
"filename": "M529LKIQ.G10", "filename": "M529LKIQ.7M0W",
"filesize": 8708, "filesize": 8708,
"a5_pickle_filename": "M529LKIQ.G10.a5.pkl", "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
} }
inserted, skipped = db.insert_events( inserted, skipped = db.insert_events(
[ev], [ev],
@@ -209,9 +209,9 @@ def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path):
assert rows[0]["blastware_filename"] is None assert rows[0]["blastware_filename"] is None
rec = { rec = {
"filename": "M529LKIQ.G10", "filename": "M529LKIQ.7M0W",
"filesize": 4242, "filesize": 4242,
"a5_pickle_filename": "M529LKIQ.G10.a5.pkl", "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl",
} }
inserted, skipped = db.insert_events( inserted, skipped = db.insert_events(
[ev], [ev],