From 3711b11bda31ca6265089309e9a29657c71cf806 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 6 May 2026 19:03:38 +0000 Subject: [PATCH 1/5] feat: add waveform store handling --- bridges/ach_server.py | 54 ++++++- sfm/database.py | 66 +++++++- sfm/server.py | 197 ++++++++++++++++++++++- sfm/waveform_store.py | 164 +++++++++++++++++++ tests/test_waveform_store.py | 302 +++++++++++++++++++++++++++++++++++ 5 files changed, 777 insertions(+), 6 deletions(-) create mode 100644 sfm/waveform_store.py create mode 100644 tests/test_waveform_store.py diff --git a/bridges/ach_server.py b/bridges/ach_server.py index edd4f2e..25d988c 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -70,6 +70,7 @@ from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event, MonitorLogEntry from sfm.database import SeismoDb +from sfm.waveform_store import WaveformStore log = logging.getLogger("ach_server") @@ -139,6 +140,7 @@ class AchSession: max_events: Optional[int], state_path: Path, db: "SeismoDb", + store: "WaveformStore", clear_after_download: bool = False, restart_monitoring: bool = False, ) -> None: @@ -150,6 +152,7 @@ class AchSession: self.max_events = max_events self.state_path = state_path self.db = db + self.store = store self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring @@ -407,8 +410,37 @@ class AchSession: if skipped: log.info(" (skipped %d already-downloaded event(s))", skipped) + # ── Persist .G10 / A5 sidecars to the waveform store ── + # Saves ride alongside the existing JSON dump so the on-disk + # .G10 and the events.json reference the same set of events. + waveform_records: dict[str, dict] = {} + for ev in new_events: + if not ev._a5_frames: + continue + try: + rec = self.store.save( + ev, + serial=serial or "UNKNOWN", + a5_frames=ev._a5_frames, + ) + if ev._waveform_key is not None: + waveform_records[ev._waveform_key.hex()] = rec + log.info( + " [WAVE] saved %s (%d bytes)", + rec["filename"], rec["filesize"], + ) + except Exception as exc: + key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" + log.warning( + " [WARN] Waveform store save failed for %s: %s", + key_hex, exc, + ) + if new_events: - _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) + _save_json( + session_dir / "events.json", + [_event_to_dict(e, waveform_records) for e in new_events], + ) for ev in new_events: pv = ev.peak_values @@ -467,7 +499,10 @@ class AchSession: _session_start = datetime.datetime.now() try: _ev_ins, _ev_skip = self.db.insert_events( - new_events, serial=serial or self.peer, session_id=None + new_events, + serial=serial or self.peer, + session_id=None, + waveform_records=waveform_records, ) _ml_ins, _ml_skip = self.db.insert_monitor_log( new_monitor_entries, session_id=None @@ -592,7 +627,10 @@ def _device_info_to_dict(d: DeviceInfo) -> dict: } -def _event_to_dict(e: Event) -> dict: +def _event_to_dict( + e: Event, + waveform_records: Optional[dict[str, dict]] = None, +) -> dict: pv = e.peak_values pi = e.project_info peaks = {} @@ -611,6 +649,11 @@ def _event_to_dict(e: Event) -> dict: for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" + + rec: dict = {} + if waveform_records and e._waveform_key is not None: + rec = waveform_records.get(e._waveform_key.hex(), {}) or {} + return { "timestamp": str(e.timestamp) if e.timestamp else None, "project": pi.project if pi else None, @@ -619,6 +662,9 @@ def _event_to_dict(e: Event) -> dict: "sensor_location": pi.sensor_location if pi else None, "peaks": peaks, "raw_samples_preview": samples, + "blastware_filename": rec.get("filename"), + "blastware_filesize": rec.get("filesize"), + "a5_pickle_filename": rec.get("a5_pickle_filename"), } @@ -640,6 +686,7 @@ def serve(args: argparse.Namespace) -> None: output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" db = SeismoDb(output_dir / "seismo_relay.db") + store = WaveformStore(output_dir / "waveforms") server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -694,6 +741,7 @@ def serve(args: argparse.Namespace) -> None: max_events=max_ev, state_path=state_path, db=db, + store=store, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, ) diff --git a/sfm/database.py b/sfm/database.py index 110ba7a..0f3c648 100644 --- a/sfm/database.py +++ b/sfm/database.py @@ -81,6 +81,9 @@ CREATE TABLE IF NOT EXISTS events ( sample_rate INTEGER, record_type TEXT, -- "single_shot" | "continuous" false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) + blastware_filename TEXT, -- e.g. "M529LKIQ.G10" — within waveform store + blastware_filesize INTEGER, -- bytes; NULL if no .G10 saved + a5_pickle_filename TEXT, -- ".a5.pkl" sidecar created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) ); @@ -184,6 +187,20 @@ class SeismoDb: """) log.info("_migrate: events table rebuilt OK") + # Migration 1b: add Blastware-file columns to existing events tables. + # New columns are NULLable so old rows just read NULL. + existing_cols = { + r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall() + } + for col, ddl in ( + ("blastware_filename", "TEXT"), + ("blastware_filesize", "INTEGER"), + ("a5_pickle_filename", "TEXT"), + ): + if col not in existing_cols: + log.info("_migrate: events ADD COLUMN %s %s", col, ddl) + conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}") + # Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to # (serial, start_time) — same reasoning as events. row = conn.execute( @@ -282,12 +299,24 @@ class SeismoDb: *, serial: str, session_id: Optional[str] = None, + waveform_records: Optional[dict[str, dict]] = None, ) -> tuple[int, int]: """ Insert triggered events. Silently skips duplicates (serial+timestamp). Returns (inserted, skipped). + + ``waveform_records`` (optional): dict keyed by event waveform_key (hex) + whose value is a record from ``WaveformStore.save()``: + {"filename": str, "filesize": int, "a5_pickle_filename": str} + + For events whose key is in this dict, the matching columns are + populated. If a row with the same (serial, timestamp) already exists + (dedup hit), the matching waveform record is upserted onto the + existing row so a re-download via the live endpoint refreshes the + file metadata. """ inserted = skipped = 0 + wave_recs = waveform_records or {} with self._connect() as conn: for ev in events: key = ev._waveform_key.hex() if ev._waveform_key else None @@ -307,6 +336,7 @@ class SeismoDb: pv = ev.peak_values pi = ev.project_info + rec = wave_recs.get(key) or {} try: conn.execute( @@ -315,8 +345,9 @@ class SeismoDb: (id, serial, waveform_key, session_id, timestamp, tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, project, client, operator, sensor_location, - sample_rate, record_type) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + sample_rate, record_type, + blastware_filename, blastware_filesize, a5_pickle_filename) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self._new_id(), serial, key, session_id, ts, @@ -331,16 +362,47 @@ class SeismoDb: pi.sensor_location if pi else None, ev.sample_rate, ev.record_type, + rec.get("filename"), + rec.get("filesize"), + rec.get("a5_pickle_filename"), ), ) inserted += 1 except sqlite3.IntegrityError: skipped += 1 + # Upsert waveform fields onto the existing dedup row so a + # re-download via the live endpoint refreshes filename / + # size / sidecar without churning the rest of the row. + if rec and ts: + conn.execute( + """ + UPDATE events + SET blastware_filename = ?, + blastware_filesize = ?, + a5_pickle_filename = ? + WHERE serial = ? AND timestamp = ? + """, + ( + rec.get("filename"), + rec.get("filesize"), + rec.get("a5_pickle_filename"), + serial, + ts, + ), + ) log.debug("insert_events serial=%s inserted=%d skipped=%d", serial, inserted, skipped) return inserted, skipped + def get_event(self, event_id: str) -> Optional[dict]: + """Return one event row by id, or None.""" + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM events WHERE id = ?", (event_id,), + ).fetchone() + return dict(row) if row else None + def query_events( self, serial: Optional[str] = None, diff --git a/sfm/server.py b/sfm/server.py index 9683254..1f0b525 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -47,7 +47,7 @@ from typing import Optional try: from fastapi import Body, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware - from fastapi.responses import FileResponse, JSONResponse + from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from pydantic import BaseModel import uvicorn except ImportError: @@ -63,8 +63,10 @@ from minimateplus.protocol import ProtocolError from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT from minimateplus.blastware_file import write_blastware_file, blastware_filename +from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform from sfm.cache import SFMCache, get_cache from sfm.database import SeismoDb +from sfm.waveform_store import WaveformStore logging.basicConfig( level=logging.INFO, @@ -101,6 +103,7 @@ app.add_middleware( _DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db" _db: Optional[SeismoDb] = None +_store: Optional[WaveformStore] = None def _get_db() -> SeismoDb: @@ -110,6 +113,18 @@ def _get_db() -> SeismoDb: return _db +def _get_store() -> WaveformStore: + """ + Persistent .G10 + A5-sidecar store, rooted at /waveforms/. + Mirrors the layout used by bridges/ach_server.py so files saved by ACH + ingestion and by live SFM downloads share one canonical location. + """ + global _store + if _store is None: + _store = WaveformStore(_get_db().db_path.parent / "waveforms") + return _store + + # ── Live device cache ───────────────────────────────────────────────────────── # In-memory cache for live device data. Avoids re-dialing the device on every # request when the data hasn't changed. @@ -946,6 +961,27 @@ def device_event_blastware_file( out_path, len(a5_frames), serial, ) + # Promote to canonical persistent store + DB row so this event is + # queryable via /db/events afterwards (matches the ACH ingestion path). + if serial != "UNKNOWN" and ev._waveform_key is not None: + try: + rec = _get_store().save(ev, serial=serial, a5_frames=a5_frames) + _get_db().insert_events( + [ev], + serial=serial, + waveform_records={ev._waveform_key.hex(): rec}, + ) + log.info( + "blastware_file: persisted to store (%s, %d bytes)", + rec["filename"], rec["filesize"], + ) + except Exception as exc: + log.warning( + "blastware_file: persistent store save failed: %s " + "— temp file still served", + exc, + ) + return FileResponse( path=str(out_path), filename=filename, @@ -1435,6 +1471,165 @@ def db_set_false_trigger( return {"status": "ok", "event_id": event_id, "false_trigger": value} +# ── /db/events/{id} — waveform file accessors ───────────────────────────────── +# +# These endpoints serve files from the persistent WaveformStore, so a Blastware +# file or its decoded JSON for a previously-ingested ACH event can be fetched +# without re-dialing the device. + +@app.get("/db/events/{event_id}/blastware_file") +def db_event_blastware_file(event_id: str) -> FileResponse: + """ + Return the Blastware-format waveform file (.G10/.W/.H/etc.) for a + previously-ingested event. 404 if the event is unknown or has no + .G10 in the store (events ingested before the store was wired will + show this — re-download via the live endpoint to populate). + """ + row = _get_db().get_event(event_id) + if row is None: + raise HTTPException(status_code=404, detail=f"Event {event_id} not found") + serial = row.get("serial") + filename = row.get("blastware_filename") + if not serial or not filename: + raise HTTPException( + status_code=404, + detail=( + f"Event {event_id} has no Blastware file in the store. " + "Re-download via the live endpoint to populate." + ), + ) + bw_path = _get_store().open_blastware(serial, filename) + if bw_path is None: + raise HTTPException( + status_code=410, + detail=f"Stored file missing on disk: {filename}", + ) + return FileResponse( + path=str(bw_path), + filename=filename, + media_type="application/octet-stream", + ) + + +@app.get("/db/events/{event_id}/waveform.json") +def db_event_waveform_json(event_id: str) -> dict: + """ + Return the decoded raw_samples + metadata for a stored event in the + same JSON shape as `/device/event/{index}/waveform`. + + Reads the `.a5.pkl` sidecar from the store, rebuilds an Event in + memory, runs the standard A5 decoders, and serialises the result. + """ + row = _get_db().get_event(event_id) + if row is None: + raise HTTPException(status_code=404, detail=f"Event {event_id} not found") + serial = row.get("serial") + filename = row.get("blastware_filename") + a5_name = row.get("a5_pickle_filename") + if not serial or not filename or not a5_name: + raise HTTPException( + status_code=404, + detail=( + f"Event {event_id} has no A5 sidecar in the store. " + "Re-download via the live endpoint to populate." + ), + ) + a5_frames = _get_store().load_a5(serial, filename) + if not a5_frames: + raise HTTPException( + status_code=410, + detail=f"A5 sidecar missing or unreadable: {a5_name}", + ) + + # Rebuild a minimal Event from DB fields, then decode A5 onto it. + ev = Event(index=-1) + try: + _decode_a5_metadata_into(a5_frames, ev) + except Exception as exc: + log.warning("db_event_waveform_json: metadata decode failed: %s", exc) + try: + _decode_a5_waveform(a5_frames, ev) + except Exception as exc: + log.error("db_event_waveform_json: waveform decode failed: %s", exc, exc_info=True) + raise HTTPException(status_code=500, detail=f"Waveform decode failed: {exc}") from exc + + raw = getattr(ev, "raw_samples", None) or {} + samples_decoded = len(raw.get("Tran", [])) + return { + "event_id": event_id, + "serial": serial, + "record_type": ev.record_type or row.get("record_type"), + "timestamp": _serialise_timestamp(ev.timestamp) or row.get("timestamp"), + "total_samples": ev.total_samples, + "pretrig_samples": ev.pretrig_samples, + "rectime_seconds": ev.rectime_seconds, + "samples_decoded": samples_decoded, + "sample_rate": ev.sample_rate or row.get("sample_rate"), + "peak_values": _serialise_peak_values(ev.peak_values) or { + "transverse": row.get("tran_ppv"), + "vertical": row.get("vert_ppv"), + "longitudinal": row.get("long_ppv"), + "vector_sum": row.get("peak_vector_sum"), + "mic": row.get("mic_ppv"), + }, + "channels": raw, + } + + +@app.get("/db/units/{serial}/waveforms.zip") +def db_unit_waveforms_zip( + serial: str, + from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), + to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), + limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"), +) -> StreamingResponse: + """ + Stream a ZIP of all .G10/.W files for a serial in the optional date range. + Events without a stored Blastware file are silently skipped. + """ + import io + import zipfile + + from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None + to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None + + rows = _get_db().query_events( + serial=serial, + from_dt=from_parsed, + to_dt=to_parsed, + limit=limit, + offset=0, + ) + store = _get_store() + + buf = io.BytesIO() + written = 0 + with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for row in rows: + fn = row.get("blastware_filename") + if not fn: + continue + bw_path = store.open_blastware(serial, fn) + if bw_path is None: + continue + zf.write(bw_path, arcname=fn) + written += 1 + + if written == 0: + raise HTTPException( + status_code=404, + detail=f"No stored Blastware files found for serial {serial} in range", + ) + + buf.seek(0) + safe_serial = serial.replace("/", "_") + headers = { + "Content-Disposition": f'attachment; filename="{safe_serial}_waveforms.zip"', + "X-Waveform-Count": str(written), + } + return StreamingResponse(buf, media_type="application/zip", headers=headers) + + @app.get("/db/monitor_log") def db_monitor_log( serial: Optional[str] = Query(None, description="Filter by unit serial"), diff --git a/sfm/waveform_store.py b/sfm/waveform_store.py new file mode 100644 index 0000000..edf097a --- /dev/null +++ b/sfm/waveform_store.py @@ -0,0 +1,164 @@ +""" +sfm/waveform_store.py — On-disk store for Blastware-format waveform files. + +Layout (flat per-serial): + + // ← .G10 / .W / .H / etc. (Blastware-readable) + //.a5.pkl ← pickled list of A5 S3Frame dicts + +`` is whatever `minimateplus.blastware_file.blastware_filename` +produces for the event (encodes serial + timestamp + record type). Filenames +never collide for the same physical event. + +The `.a5.pkl` sidecar lets the .G10 be regenerated later if the encoder +changes — captures the raw 5A frame stream as serializable dicts so the +schema isn't tied to the `S3Frame` dataclass layout. +""" + +from __future__ import annotations + +import logging +import pickle +from pathlib import Path +from typing import Optional + +from minimateplus.blastware_file import blastware_filename, write_blastware_file +from minimateplus.framing import S3Frame +from minimateplus.models import Event + +log = logging.getLogger("sfm.waveform_store") + +A5_PICKLE_VERSION = 1 + + +def _frame_to_dict(f: S3Frame) -> dict: + return { + "sub": f.sub, + "page_hi": f.page_hi, + "page_lo": f.page_lo, + "data": bytes(f.data), + "chk_byte": f.chk_byte, + "checksum_valid": f.checksum_valid, + } + + +def _dict_to_frame(d: dict) -> S3Frame: + return S3Frame( + sub=d["sub"], + page_hi=d["page_hi"], + page_lo=d["page_lo"], + data=bytes(d["data"]), + checksum_valid=d.get("checksum_valid", True), + chk_byte=d.get("chk_byte", 0), + ) + + +class WaveformStore: + """ + Persistent store for Blastware-format waveform files + their A5 source frames. + + Thread safety: write_blastware_file is single-shot; concurrent saves of the + *same* filename would race, but the filename encodes second-resolution + timestamps + serial, so collisions across threads/processes are vanishingly + unlikely in practice. + """ + + def __init__(self, root: str | Path) -> None: + self.root = Path(root) + self.root.mkdir(parents=True, exist_ok=True) + log.info("WaveformStore root=%s", self.root) + + # ── path helpers ──────────────────────────────────────────────────────────── + + def _serial_dir(self, serial: str) -> Path: + d = self.root / serial + d.mkdir(parents=True, exist_ok=True) + return d + + def paths_for(self, serial: str, filename: str) -> tuple[Path, Path]: + """Return (blastware_path, a5_pickle_path) for a given serial+filename.""" + d = self._serial_dir(serial) + return d / filename, d / f"{filename}.a5.pkl" + + def open_blastware(self, serial: str, filename: str) -> Optional[Path]: + """Return absolute path to an existing .G10 file or None.""" + bw_path, _ = self.paths_for(serial, filename) + return bw_path if bw_path.exists() else None + + # ── save / load ───────────────────────────────────────────────────────────── + + def save( + self, + ev: Event, + serial: str, + a5_frames: list[S3Frame], + ) -> dict: + """ + Write the .G10 file and the .a5.pkl sidecar for one event. + + Returns a record dict suitable for persisting alongside the DB row: + + { + "filename": "M529LKIQ.G10", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + + Idempotent: if the .G10 already exists, it is overwritten with the + freshly-encoded version (same bytes for the same a5_frames). + """ + if not a5_frames: + raise ValueError("WaveformStore.save: a5_frames is empty") + if not serial: + raise ValueError("WaveformStore.save: serial is required") + + filename = blastware_filename(ev, serial) + bw_path, a5_path = self.paths_for(serial, filename) + + # 1. encode the .G10 + # Delete any stale file at this path so partial writes never leak + # trailing bytes from a previous larger file (matches the live + # endpoint's defensive unlink). + try: + bw_path.unlink() + except FileNotFoundError: + pass + write_blastware_file(ev, a5_frames, bw_path) + filesize = bw_path.stat().st_size + + # 2. write the .a5.pkl sidecar + try: + a5_path.unlink() + except FileNotFoundError: + pass + payload = { + "version": A5_PICKLE_VERSION, + "frames": [_frame_to_dict(f) for f in a5_frames], + } + with a5_path.open("wb") as fp: + pickle.dump(payload, fp, protocol=pickle.HIGHEST_PROTOCOL) + + log.info( + "WaveformStore.save serial=%s filename=%s filesize=%d frames=%d", + serial, filename, filesize, len(a5_frames), + ) + return { + "filename": filename, + "filesize": filesize, + "a5_pickle_filename": a5_path.name, + } + + def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]: + """ + Re-hydrate the pickled A5 frame stream for a stored event. + Returns None if the sidecar is missing. + """ + _, a5_path = self.paths_for(serial, filename) + if not a5_path.exists(): + return None + with a5_path.open("rb") as fp: + payload = pickle.load(fp) + if not isinstance(payload, dict) or "frames" not in payload: + log.warning("WaveformStore.load_a5: malformed sidecar at %s", a5_path) + return None + return [_dict_to_frame(d) for d in payload["frames"]] diff --git a/tests/test_waveform_store.py b/tests/test_waveform_store.py new file mode 100644 index 0000000..4207b25 --- /dev/null +++ b/tests/test_waveform_store.py @@ -0,0 +1,302 @@ +""" +test_waveform_store.py — unit tests for sfm/waveform_store.py and the +SeismoDb columns + insert_events upsert path that the store depends on. + +These tests exercise the *store + DB plumbing* in isolation — they do not +re-test write_blastware_file (covered separately) and do not require a live +device or a wire capture. + +Run: + python -m pytest tests/test_waveform_store.py -v +""" + +from __future__ import annotations + +import os +import sys +import datetime +from pathlib import Path + +try: + import pytest +except ImportError: # allow running standalone without pytest installed + pytest = None # type: ignore + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from minimateplus.framing import S3Frame +from minimateplus.models import Event, Timestamp + + +# ── Test fixtures ────────────────────────────────────────────────────────────── + + +def _make_synthetic_event() -> tuple[Event, list[S3Frame]]: + """ + Build a minimal Event + a 3-frame A5 stream that satisfies + write_blastware_file's STRT-extraction path. + + Frame 0 (probe): contains a STRT record at the canonical position so + write_blastware_file finds it without falling back. + Frame 1 (sample): 0x0200 bytes of zeros at page_key=0x0010 (sample marker). + Frame 2 (TERM): page_key=0x0000 marks the terminator. + """ + key4 = bytes.fromhex("01110000") + rectime = 3 + strt = b"STRT" + b"\xff\xfe" + key4 + key4 + bytes(7) + bytes([rectime]) + + # Probe payload prefix: 7 zero bytes then STRT (matches blastware_file._strip + # logic which looks for STRT in data[7:]). Tail with 32 zero bytes of fake + # body so reconstruction has something to slice. + probe_data = bytes(7) + strt + bytes(32) + probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data, + checksum_valid=True, chk_byte=0x00) + + sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10, + data=bytes(7) + bytes(0x0200), checksum_valid=True, + chk_byte=0x00) + + term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00, + data=bytes(7) + bytes(64), checksum_valid=True, + chk_byte=0x00) + + ev = Event(index=0) + ev._waveform_key = key4 + ev.timestamp = Timestamp( + raw=b"", + flag=0x10, + year=2026, + unknown_byte=0, + month=5, + day=6, + hour=12, + minute=34, + second=56, + ) + ev.rectime_seconds = rectime + ev.record_type = "Waveform" + ev._a5_frames = [probe, sample, term] + return ev, [probe, sample, term] + + +# ── Frame round-trip ─────────────────────────────────────────────────────────── + + +def test_frame_dict_round_trip(): + """_frame_to_dict and _dict_to_frame must round-trip every field.""" + from sfm.waveform_store import _dict_to_frame, _frame_to_dict + + f = S3Frame( + sub=0xA5, page_hi=0x12, page_lo=0x34, + data=b"\x10\x02\x00\xab\xcd", + checksum_valid=False, + chk_byte=0x42, + ) + d = _frame_to_dict(f) + g = _dict_to_frame(d) + assert g.sub == f.sub + assert g.page_hi == f.page_hi + assert g.page_lo == f.page_lo + assert g.data == f.data + assert g.checksum_valid == f.checksum_valid + assert g.chk_byte == f.chk_byte + + +# ── Store save/load round-trip ───────────────────────────────────────────────── + + +def test_waveform_store_save_load_round_trip(tmp_path: Path): + """save() writes both files; load_a5() returns equivalent frames.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + + rec = store.save(ev, serial="BE11529", a5_frames=frames) + + assert rec["filename"].startswith("M529") + assert rec["filesize"] > 0 + assert rec["a5_pickle_filename"] == rec["filename"] + ".a5.pkl" + + bw_path = store.open_blastware("BE11529", rec["filename"]) + assert bw_path is not None + assert bw_path.exists() + assert bw_path.stat().st_size == rec["filesize"] + + # Sidecar exists and round-trips + loaded = store.load_a5("BE11529", rec["filename"]) + assert loaded is not None + assert len(loaded) == len(frames) + for orig, got in zip(frames, loaded): + assert got.sub == orig.sub + assert got.page_hi == orig.page_hi + assert got.page_lo == orig.page_lo + assert got.data == orig.data + + +def test_waveform_store_missing_returns_none(tmp_path: Path): + """open_blastware / load_a5 return None for nonexistent entries.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + assert store.open_blastware("BE99999", "no_such.G10") is None + assert store.load_a5("BE99999", "no_such.G10") is None + + +def test_waveform_store_idempotent_save(tmp_path: Path): + """Saving the same event twice produces the same .G10 bytes.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + + rec1 = store.save(ev, serial="BE11529", a5_frames=frames) + bw_path = store.open_blastware("BE11529", rec1["filename"]) + bytes1 = bw_path.read_bytes() + + rec2 = store.save(ev, serial="BE11529", a5_frames=frames) + bytes2 = bw_path.read_bytes() + + assert rec1["filename"] == rec2["filename"] + assert bytes1 == bytes2 + + +# ── DB integration ──────────────────────────────────────────────────────────── + + +def test_seismodb_persists_waveform_columns(tmp_path: Path): + """insert_events writes the new columns when waveform_records is supplied.""" + from sfm.database import SeismoDb + + db = SeismoDb(tmp_path / "seismo_relay.db") + ev, _ = _make_synthetic_event() + + rec = { + "filename": "M529LKIQ.G10", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + inserted, skipped = db.insert_events( + [ev], + serial="BE11529", + waveform_records={ev._waveform_key.hex(): rec}, + ) + assert inserted == 1 + assert skipped == 0 + + rows = db.query_events(serial="BE11529") + assert len(rows) == 1 + row = rows[0] + assert row["blastware_filename"] == rec["filename"] + assert row["blastware_filesize"] == rec["filesize"] + assert row["a5_pickle_filename"] == rec["a5_pickle_filename"] + + # get_event by id returns the same fields + row2 = db.get_event(row["id"]) + assert row2 is not None + assert row2["blastware_filename"] == rec["filename"] + + +def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path): + """Re-inserting the same (serial, timestamp) refreshes waveform fields.""" + from sfm.database import SeismoDb + + db = SeismoDb(tmp_path / "seismo_relay.db") + ev, _ = _make_synthetic_event() + + db.insert_events([ev], serial="BE11529") # no waveform record yet + rows = db.query_events(serial="BE11529") + assert rows[0]["blastware_filename"] is None + + rec = { + "filename": "M529LKIQ.G10", + "filesize": 4242, + "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + } + inserted, skipped = db.insert_events( + [ev], + serial="BE11529", + waveform_records={ev._waveform_key.hex(): rec}, + ) + assert inserted == 0 # dedup'd + assert skipped == 1 + rows = db.query_events(serial="BE11529") + assert rows[0]["blastware_filename"] == rec["filename"] + assert rows[0]["blastware_filesize"] == 4242 + + +def test_seismodb_migration_adds_columns(tmp_path: Path): + """An existing DB without the new columns gets them added on init.""" + import sqlite3 + + db_path = tmp_path / "old.db" + # Build a "v0" events table without the new columns. + with sqlite3.connect(str(db_path)) as conn: + conn.executescript(""" + CREATE TABLE events ( + id TEXT PRIMARY KEY, + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, + session_id TEXT, + timestamp TEXT, + tran_ppv REAL, + vert_ppv REAL, + long_ppv REAL, + peak_vector_sum REAL, + mic_ppv REAL, + project TEXT, + client TEXT, + operator TEXT, + sensor_location TEXT, + sample_rate INTEGER, + record_type TEXT, + false_trigger INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, timestamp) + ); + INSERT INTO events + (id, serial, waveform_key, timestamp) + VALUES + ('legacy-id', 'BE11529', '01110000', + '2026-04-01T12:00:00'); + """) + + # Initialise SeismoDb against the old DB — migration should run. + from sfm.database import SeismoDb + + db = SeismoDb(db_path) + rows = db.query_events(serial="BE11529") + assert len(rows) == 1 + assert rows[0]["blastware_filename"] is None + assert "blastware_filesize" in rows[0] + assert "a5_pickle_filename" in rows[0] + + +if __name__ == "__main__": + if pytest is not None: + pytest.main([__file__, "-v"]) + else: + # Standalone runner — does not require pytest. + import inspect + import tempfile + import traceback as _tb + + passed = failed = 0 + for _name, _fn in sorted(globals().items()): + if not _name.startswith("test_") or not callable(_fn): + continue + try: + _sig = inspect.signature(_fn) + if "tmp_path" in _sig.parameters: + with tempfile.TemporaryDirectory() as _td: + _fn(Path(_td)) + else: + _fn() + print(f"PASS {_name}") + passed += 1 + except Exception: + print(f"FAIL {_name}") + _tb.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + sys.exit(0 if failed == 0 else 1) From 0484680c8900d78470730d2d1f834f7bb2e36af3 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 6 May 2026 19:08:38 +0000 Subject: [PATCH 2/5] fix(docs/comments): rename refs to 'event files' to reflect their timestamp extenion names. --- bridges/ach_server.py | 4 ++-- sfm/database.py | 4 ++-- sfm/server.py | 16 +++++++++------- sfm/waveform_store.py | 37 +++++++++++++++++++++--------------- tests/test_waveform_store.py | 14 +++++++------- 5 files changed, 42 insertions(+), 33 deletions(-) diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 25d988c..209ec0e 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -410,9 +410,9 @@ class AchSession: if skipped: log.info(" (skipped %d already-downloaded event(s))", skipped) - # ── Persist .G10 / A5 sidecars to the waveform store ── + # ── Persist event file + A5 sidecar to the waveform store ── # Saves ride alongside the existing JSON dump so the on-disk - # .G10 and the events.json reference the same set of events. + # event file and events.json reference the same set of events. waveform_records: dict[str, dict] = {} for ev in new_events: if not ev._a5_frames: diff --git a/sfm/database.py b/sfm/database.py index 0f3c648..7f0d7dc 100644 --- a/sfm/database.py +++ b/sfm/database.py @@ -81,8 +81,8 @@ CREATE TABLE IF NOT EXISTS events ( sample_rate INTEGER, record_type TEXT, -- "single_shot" | "continuous" false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) - blastware_filename TEXT, -- e.g. "M529LKIQ.G10" — within waveform store - blastware_filesize INTEGER, -- bytes; NULL if no .G10 saved + blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp) + blastware_filesize INTEGER, -- bytes; NULL if no event file saved a5_pickle_filename TEXT, -- ".a5.pkl" sidecar created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), UNIQUE(serial, timestamp) diff --git a/sfm/server.py b/sfm/server.py index 1f0b525..6bb9e37 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -115,7 +115,7 @@ def _get_db() -> SeismoDb: def _get_store() -> WaveformStore: """ - Persistent .G10 + A5-sidecar store, rooted at /waveforms/. + Persistent event-file + A5-sidecar store, rooted at /waveforms/. Mirrors the layout used by bridges/ach_server.py so files saved by ACH ingestion and by live SFM downloads share one canonical location. """ @@ -1480,10 +1480,12 @@ def db_set_false_trigger( @app.get("/db/events/{event_id}/blastware_file") def db_event_blastware_file(event_id: str) -> FileResponse: """ - Return the Blastware-format waveform file (.G10/.W/.H/etc.) for a - previously-ingested event. 404 if the event is unknown or has no - .G10 in the store (events ingested before the store was wired will - show this — re-download via the live endpoint to populate). + Return the Blastware-format event file for a previously-ingested + event. Filename extension is per-event (timestamp-encoded + `AB0T` for ACH downloads, 3-char `AB0` for direct downloads). + 404 if the event is unknown or has no event file in the store + (events ingested before the store was wired will show this — + re-download via the live endpoint to populate). """ row = _get_db().get_event(event_id) if row is None: @@ -1584,8 +1586,8 @@ def db_unit_waveforms_zip( limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"), ) -> StreamingResponse: """ - Stream a ZIP of all .G10/.W files for a serial in the optional date range. - Events without a stored Blastware file are silently skipped. + Stream a ZIP of all event files for a serial in the optional date range. + Events without a stored event file are silently skipped. """ import io import zipfile diff --git a/sfm/waveform_store.py b/sfm/waveform_store.py index edf097a..83216f8 100644 --- a/sfm/waveform_store.py +++ b/sfm/waveform_store.py @@ -1,18 +1,22 @@ """ -sfm/waveform_store.py — On-disk store for Blastware-format waveform files. +sfm/waveform_store.py — On-disk store for Blastware-format event files. Layout (flat per-serial): - // ← .G10 / .W / .H / etc. (Blastware-readable) + // ← event file (Blastware-readable binary) //.a5.pkl ← pickled list of A5 S3Frame dicts `` is whatever `minimateplus.blastware_file.blastware_filename` -produces for the event (encodes serial + timestamp + record type). Filenames -never collide for the same physical event. +produces for the event. The extension is NOT a fixed type tag — it encodes +the event timestamp (`AB0T` format: 2-char base-36 of `total_seconds % +1296`, literal `0`, then `W`=Full Waveform / `H`=Full Histogram for ACH +downloads, or 3-char `AB0` for direct/manual downloads). Every event's +filename therefore contains its own timestamp + record-type fingerprint and +collisions across the same physical event don't occur. -The `.a5.pkl` sidecar lets the .G10 be regenerated later if the encoder -changes — captures the raw 5A frame stream as serializable dicts so the -schema isn't tied to the `S3Frame` dataclass layout. +The `.a5.pkl` sidecar lets the event file be regenerated later if the +encoder changes — captures the raw 5A frame stream as serializable dicts so +the schema isn't tied to the `S3Frame` dataclass layout. """ from __future__ import annotations @@ -81,7 +85,7 @@ class WaveformStore: return d / filename, d / f"{filename}.a5.pkl" def open_blastware(self, serial: str, filename: str) -> Optional[Path]: - """Return absolute path to an existing .G10 file or None.""" + """Return absolute path to an existing event file or None.""" bw_path, _ = self.paths_for(serial, filename) return bw_path if bw_path.exists() else None @@ -94,18 +98,21 @@ class WaveformStore: a5_frames: list[S3Frame], ) -> dict: """ - Write the .G10 file and the .a5.pkl sidecar for one event. + Write the event file and its .a5.pkl sidecar for one event. Returns a record dict suitable for persisting alongside the DB row: { - "filename": "M529LKIQ.G10", - "filesize": 8708, - "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + "filename": "M529LKIQ.7M0W", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl", } - Idempotent: if the .G10 already exists, it is overwritten with the - freshly-encoded version (same bytes for the same a5_frames). + The exact extension is timestamp-encoded per event (see + `minimateplus.blastware_file.blastware_filename`). + + Idempotent: if the event file already exists, it is overwritten with + the freshly-encoded version (same bytes for the same a5_frames). """ if not a5_frames: raise ValueError("WaveformStore.save: a5_frames is empty") @@ -115,7 +122,7 @@ class WaveformStore: filename = blastware_filename(ev, serial) bw_path, a5_path = self.paths_for(serial, filename) - # 1. encode the .G10 + # 1. encode the event file # Delete any stale file at this path so partial writes never leak # trailing bytes from a previous larger file (matches the live # endpoint's defensive unlink). diff --git a/tests/test_waveform_store.py b/tests/test_waveform_store.py index 4207b25..601985e 100644 --- a/tests/test_waveform_store.py +++ b/tests/test_waveform_store.py @@ -139,12 +139,12 @@ def test_waveform_store_missing_returns_none(tmp_path: Path): from sfm.waveform_store import WaveformStore store = WaveformStore(tmp_path / "waveforms") - assert store.open_blastware("BE99999", "no_such.G10") is None - assert store.load_a5("BE99999", "no_such.G10") is None + assert store.open_blastware("BE99999", "no_such.7M0W") is None + assert store.load_a5("BE99999", "no_such.7M0W") is None def test_waveform_store_idempotent_save(tmp_path: Path): - """Saving the same event twice produces the same .G10 bytes.""" + """Saving the same event twice produces the same event-file bytes.""" from sfm.waveform_store import WaveformStore store = WaveformStore(tmp_path / "waveforms") @@ -172,9 +172,9 @@ def test_seismodb_persists_waveform_columns(tmp_path: Path): ev, _ = _make_synthetic_event() rec = { - "filename": "M529LKIQ.G10", + "filename": "M529LKIQ.7M0W", "filesize": 8708, - "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl", } inserted, skipped = db.insert_events( [ev], @@ -209,9 +209,9 @@ def test_seismodb_dedup_upserts_waveform_fields(tmp_path: Path): assert rows[0]["blastware_filename"] is None rec = { - "filename": "M529LKIQ.G10", + "filename": "M529LKIQ.7M0W", "filesize": 4242, - "a5_pickle_filename": "M529LKIQ.G10.a5.pkl", + "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl", } inserted, skipped = db.insert_events( [ev], From 9afa3484f4f2883332a9931b06643131dafae22f Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 7 May 2026 04:42:00 +0000 Subject: [PATCH 3/5] feat(cache): implement integrity checks for cached events and waveforms - Added `waveform_key` and `event_timestamp` columns to `CachedEvent` and `CachedWaveform` for integrity verification. - Implemented logic to flush the cache when a mismatch in (waveform_key, event_timestamp) is detected during event and waveform updates. - Enhanced `set_events` and `set_waveform` methods to check for mismatches and trigger cache eviction as necessary. - Introduced a new `LiveCache` class to manage in-memory caching of live device data, separating it from the server logic for better testability. - Added tests to verify the correctness of cache invalidation logic, particularly for post-erase key reuse scenarios. - Updated web application to include a "Force refresh" toggle, allowing users to bypass the cache and re-fetch data from the device. --- bridges/ach_server.py | 283 ++++++++++++++++++------------- minimateplus/client.py | 209 +++++++++++++---------- sfm/cache.py | 151 +++++++++++++++-- sfm/live_cache.py | 189 +++++++++++++++++++++ sfm/server.py | 120 +------------ sfm/sfm_webapp.html | 52 +++++- tests/test_cache_invalidation.py | 209 +++++++++++++++++++++++ 7 files changed, 890 insertions(+), 323 deletions(-) create mode 100644 sfm/live_cache.py create mode 100644 tests/test_cache_invalidation.py diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 209ec0e..c048d4c 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -74,39 +74,73 @@ from sfm.waveform_store import WaveformStore log = logging.getLogger("ach_server") -# ── Per-unit state (downloaded-key set) ─────────────────────────────────────── +# ── Per-unit state (downloaded events index) ────────────────────────────────── # Persisted as /ach_state.json -# Format: +# Format (current — v2): # { # "BE11529": { -# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk -# "max_downloaded_key": "0111245a", # highest key ever seen -# "last_seen": "2026-04-11T01:04:36" +# "downloaded_events": { # key_hex → ISO timestamp string +# "01110000": "2026-04-11T00:42:17", +# "0111245a": "2026-04-11T01:04:30" +# }, +# "max_downloaded_key": "0111245a", +# "last_seen": "2026-04-11T01:04:36", +# "serial": "BE11529", +# "peer": "63.43.212.232:51920" # } # } # -# Key-based deduplication works well within a single "key generation" (between -# erases). After the device memory is erased the event counter resets to -# 0x01110000, so the first new event has the SAME key as the very first event -# we ever downloaded. We detect this situation with max_downloaded_key: +# Why (key, timestamp) and not key alone: +# The device's event-key counter resets to 0x01110000 after every memory +# erase (internal or external). A bare-key dedup (the v1 format) cannot +# distinguish a re-recorded event with the same key from one we already +# downloaded. The 0C waveform record's timestamp IS unique per physical +# event, so we pair (key, timestamp) and treat a key with a different +# timestamp as a new event regardless of `max_downloaded_key`. # -# if max(current_device_keys) < max_downloaded_key -# → device was wiped and keys have restarted → treat all device keys as new -# -# After our own erase (--clear-after-download) we also explicitly clear -# downloaded_keys and max_downloaded_key so the next session starts fresh. +# Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on +# read: the keys are kept under a sentinel of "" (empty string) timestamp so +# the (key, timestamp) compare always sees a mismatch and forces a one-time +# re-download. After that pass the state is rewritten in v2 form. _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: - if state_path.exists(): - try: - with open(state_path) as f: - return json.load(f) - except Exception: - pass - return {} + """ + Load ach_state.json, transparently migrating any legacy + `downloaded_keys: list` entries into the v2 `downloaded_events: dict` + schema. Returns the migrated state. + """ + if not state_path.exists(): + return {} + try: + with open(state_path) as f: + state = json.load(f) + except Exception: + return {} + + # Per-unit migration: legacy list → dict-with-empty-timestamps + for unit_key, unit_state in list(state.items()): + if not isinstance(unit_state, dict): + continue + if "downloaded_events" in unit_state: + continue + legacy_keys = unit_state.get("downloaded_keys") + if isinstance(legacy_keys, list): + unit_state["downloaded_events"] = {k: "" for k in legacy_keys} + log.info( + "ach_state: migrated %s from v1 (downloaded_keys list) → v2 " + "(downloaded_events dict, %d keys with empty timestamps; " + "they will re-validate on next session)", + unit_key, len(legacy_keys), + ) + else: + unit_state["downloaded_events"] = {} + # keep legacy field for one cycle; cleared on next save + unit_state.pop("downloaded_keys", None) + + return state def _save_state(state_path: Path, state: dict) -> None: @@ -143,6 +177,7 @@ class AchSession: store: "WaveformStore", clear_after_download: bool = False, restart_monitoring: bool = False, + force_redownload: bool = False, ) -> None: self.sock = sock self.peer = peer @@ -155,6 +190,11 @@ class AchSession: self.store = store self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring + # `force_redownload` tells this session to ignore ach_state and + # re-download every event currently on the device, regardless of any + # (key, timestamp) match. Useful as a manual override when state has + # become inconsistent with what's actually on disk / in the DB. + self.force_redownload = force_redownload def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -276,11 +316,20 @@ class AchSession: state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial unit_state = state.get(unit_key, {}) - seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) - # Highest event key ever downloaded from this unit (hex string, 8 chars). - # Used to detect post-erase key reuse — see comment block above. + + # downloaded_events is the v2 (key_hex → timestamp_iso) dict. + # Empty-string timestamps are migrated v1 entries — they force a + # one-time re-download because the (key, timestamp) compare always + # mismatches against any non-empty timestamp from a fresh 0C read. + seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {})) max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") + if self.force_redownload: + log.info(" --force-redownload-all set — ignoring %d cached " + "(key, timestamp) entries for this session", + len(seen_events)) + seen_events = {} + # Walk the event index (browse-mode, no 5A) to get the actual current # key list. The SUB 08 event_count field is a lifetime "total events # ever recorded" counter that does NOT decrement on erase — confirmed @@ -293,11 +342,10 @@ class AchSession: log.warning(" list_event_keys failed: %s -- falling back to full download", exc) device_keys = None - # Use the walk result as our authoritative current count. current_count = len(device_keys) if device_keys is not None else 0 - log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", - current_count, len(seen_keys)) + log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded", + current_count, len(seen_events)) if device_keys is not None and current_count == 0: log.info(" [OK] No events on device -- nothing to download") @@ -305,75 +353,29 @@ class AchSession: return if device_keys is not None: - # ── Post-erase detection ────────────────────────────────────── - # After the device memory is erased, new events start from key - # 01110000 again — the same keys we already downloaded. Detect - # this by comparing the device's current highest key against the - # historical maximum. If the device has rolled back below our - # high-water mark, its counter was reset and we must treat all - # its keys as new, regardless of what seen_keys contains. + # ── Post-erase detection (best-effort, key-only signal) ─────── + # After erase the device's key counter resets to 01110000. + # If the device's current max key is below our high-water mark + # we know erase happened. This catches the cleanest case but + # does NOT catch erase-then-record-many-events (where the new + # max may climb past the old max). The (key, timestamp) check + # in get_events() is what handles those. if device_keys and max_seen_key != "00000000": - max_device_key = max(device_keys) # lexicographic; safe because - # keys share the same 4-char prefix + max_device_key = max(device_keys) if max_device_key < max_seen_key: log.info( " Post-erase reset detected: " "device max key %s < historical max %s " - "-- treating all device keys as new", + "-- discarding stale (key, ts) state for this session", max_device_key, max_seen_key, ) - seen_keys = set() # discard stale dedup info for this session + seen_events = {} - new_key_set = set(device_keys) - seen_keys - log.info(" Device has %d key(s): %d new, %d already seen", - len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) - if not new_key_set: - log.info(" [OK] All events already downloaded -- nothing to do") - # Refresh state timestamp; preserve max_seen_key unchanged. - state[unit_key] = { - "downloaded_keys": sorted(seen_keys | set(device_keys)), - "max_downloaded_key": max_seen_key, - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - - # ── Erase even when no new events (if requested) ────────── - # Blastware ACH always erases after every session — even when - # nothing new was downloaded. Without the erase the device - # still sees stored events in its memory and immediately - # retries the call-home, causing the looping we observed. - # Only erase when device actually has events stored; skip - # the erase if device_keys is empty (nothing to erase). - if self.clear_after_download and device_keys: - log.info( - " Clearing device memory (--clear-after-download, " - "no new events but device has %d stored)...", - len(device_keys), - ) - try: - client.delete_all_events() - log.info(" [OK] Device memory cleared") - # Reset state so the next session starts fresh. - state[unit_key] = { - "downloaded_keys": [], - "max_downloaded_key": "00000000", - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - except Exception as exc: - log.error( - " [WARN] Event deletion failed: %s -- events NOT cleared", - exc, - ) - - log.info("Session complete (no new events) -> %s", session_dir) - return - else: - new_key_set = None # unknown; proceed with full download + # Note: no early-exit "all already downloaded" short-circuit + # here. Without per-event timestamps we cannot tell whether + # device_keys ⊆ seen_events.keys() actually means we have + # those physical events. get_events() will read 0C on its + # skip path and decide per event. # Apply max_events cap # stop_idx: when we know the count from list_event_keys, use it as @@ -391,24 +393,35 @@ class AchSession: ) try: + # Pass `seen_events` (key → ISO timestamp) so the client can + # read 0C on its skip path and only skip 5A when the per-event + # timestamp matches what we already have on disk. When force_- + # redownload is set, seen_events was already cleared above. + # + # Filter out empty-string timestamps (legacy v1 entries) — the + # client's 0C-on-skip-path only trusts entries with a + # populated timestamp; otherwise it falls through to a full + # 5A download. + skip_dict = {k: ts for k, ts in seen_events.items() if ts} + all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, - skip_waveform_for_keys=seen_keys if seen_keys else None, + skip_waveform_for_events=skip_dict if skip_dict else None, ) - # Filter to events whose keys we haven't saved before. + # New events are those that came back with _a5_frames populated + # (= 5A actually ran on this session). Skipped events have + # _a5_frames = None because the client matched (key, timestamp) + # against skip_dict and bypassed 5A. new_events = [ e for e in all_events - if e._waveform_key is None - or e._waveform_key.hex() not in seen_keys + if getattr(e, "_a5_frames", None) ] skipped = len(all_events) - len(new_events) - log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", + log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)", len(all_events), len(new_events), skipped) - if skipped: - log.info(" (skipped %d already-downloaded event(s))", skipped) # ── Persist event file + A5 sidecar to the waveform store ── # Saves ride alongside the existing JSON dump so the on-disk @@ -537,35 +550,64 @@ class AchSession: ) # ── Update persistent state ─────────────────────────────────── - # Include both triggered-event keys and monitor-log keys in the - # downloaded set so they are not re-processed on the next call-home. - current_event_keys = [ - e._waveform_key.hex() - for e in all_events - if e._waveform_key is not None - ] - current_monitor_keys = [e.key for e in new_monitor_entries] - current_keys = current_event_keys + current_monitor_keys + # Build a fresh (key → ISO timestamp) map from THIS session's + # results. For each event currently on the device, prefer the + # timestamp we just observed (from 0C); fall back to whatever + # was already in seen_events for that key (so we don't lose an + # entry just because get_events skipped it on the (key, ts) + # match path). + def _ts_iso(ev) -> str: + ts = getattr(ev, "timestamp", None) + if ts is None: + return "" + try: + return datetime.datetime( + ts.year, ts.month, ts.day, + ts.hour or 0, ts.minute or 0, ts.second or 0, + ).isoformat() + except Exception: + return str(ts) + + current_events_map: dict[str, str] = {} + for ev in all_events: + if ev._waveform_key is None: + continue + key_hex = ev._waveform_key.hex() + ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "") + current_events_map[key_hex] = ts_iso + + # Monitor-log entries don't have a 0C-style timestamp, but + # they DO have a start_time; use that so the monitor-log keys + # are properly entered into the (key, ts) map. + for ml in new_monitor_entries: + key_hex = ml.key + ts = ml.start_time + ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "") + # If a triggered event already populated this key, keep + # whichever has a non-empty timestamp. + if key_hex not in current_events_map or not current_events_map[key_hex]: + current_events_map[key_hex] = ts_iso if erased_successfully: - # Device memory is clear. Reset downloaded_keys and the - # high-water mark so the next call-home starts fresh and - # doesn't mis-identify the recycled key 01110000 as "seen". - updated_keys = [] + updated_events: dict[str, str] = {} new_max_key = "00000000" log.info( " State reset after erase -- next session will download " "from key 0 (device counter resets after erase)" ) else: - # Normal (no erase): union of previously-seen + all keys on - # device now. Includes already-seen survivors so we never - # re-download them if the device somehow keeps old records. - updated_keys = sorted(set(seen_keys) | set(current_keys)) - new_max_key = updated_keys[-1] if updated_keys else max_seen_key + # Merge: keep prior (key, ts) entries we still have evidence + # of (for survivors of any partial failure), plus this + # session's authoritative (key, ts) pairs. + updated_events = dict(seen_events) + updated_events.update(current_events_map) + new_max_key = ( + max(updated_events.keys()) + if updated_events else max_seen_key + ) state[unit_key] = { - "downloaded_keys": updated_keys, + "downloaded_events": updated_events, "max_downloaded_key": new_max_key, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, @@ -704,6 +746,7 @@ def serve(args: argparse.Namespace) -> None: print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") + print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") @@ -744,6 +787,7 @@ def serve(args: argparse.Namespace) -> None: store=store, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, + force_redownload=args.force_redownload_all, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() @@ -828,6 +872,17 @@ def parse_args() -> argparse.Namespace: "This mirrors the standard Blastware ACH workflow." ), ) + p.add_argument( + "--force-redownload-all", + action="store_true", + default=False, + help=( + "Manual override: ignore ach_state.json's downloaded_events map " + "for this session and re-download every event currently on the " + "device, regardless of (key, timestamp) match. Useful when state " + "has become inconsistent with the on-disk waveform store / DB." + ), + ) p.add_argument( "--verbose", "-v", action="store_true", diff --git a/minimateplus/client.py b/minimateplus/client.py index 7b1f9eb..cddabbe 100644 --- a/minimateplus/client.py +++ b/minimateplus/client.py @@ -449,7 +449,7 @@ class MiniMateClient: proto.confirm_erase_all() log.info("delete_all_events: erase confirmed — device memory cleared") - def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None, extra_chunks_after_metadata: int = 1) -> list[Event]: + def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None, skip_waveform_for_events: Optional[dict] = None, extra_chunks_after_metadata: int = 1) -> list[Event]: """ Download all stored events from the device using the confirmed 1E → 0A → 0C → 5A → 1F event-iterator protocol. @@ -497,37 +497,24 @@ class MiniMateClient: events: list[Event] = [] idx = 0 + # Legacy bare-key skip set is deprecated: the device's key counter + # resets to 0x01110000 after every memory erase, so a key in this set + # cannot be trusted to identify the same physical event across erases. + # If a caller still passes it, log a warning and ignore — full + # downloads will run for every event so the bug never silently bites. + if skip_waveform_for_keys: + log.warning( + "get_events: skip_waveform_for_keys is deprecated and unsafe " + "(post-erase key reuse); ignoring %d entries. Use " + "skip_waveform_for_events={key: timestamp_iso} instead.", + len(skip_waveform_for_keys), + ) + skip_evts: dict[str, str] = dict(skip_waveform_for_events or {}) + while data8[4:8] != b"\x00\x00\x00\x00": cur_key = key4 # key for this event's 0A/1E-arm/0C/5A calls log.info("get_events: record %d key=%s", idx, cur_key.hex()) - # Fast-advance path: if this key is already downloaded, skip - # 1E-arm/0C/POLL/5A entirely. Only 0A + 1F(browse) are needed - # to advance the device's internal pointer to the next event. - # This is identical to the browse-mode walk in count_events(). - if skip_waveform_for_keys and cur_key.hex() in skip_waveform_for_keys: - log.debug("get_events: key=%s already seen -- fast-advance only", cur_key.hex()) - try: - proto.read_waveform_header(cur_key) - except ProtocolError as exc: - log.warning( - "get_events: 0A failed for key=%s (skip path): %s -- stopping", - cur_key.hex(), exc, - ) - break - try: - key4, data8 = proto.advance_event(browse=True) - except ProtocolError as exc: - log.warning( - "get_events: 1F failed for key=%s (skip path): %s -- stopping", - cur_key.hex(), exc, - ) - break - idx += 1 - if stop_after_index is not None and idx > stop_after_index: - break - continue - ev = Event(index=idx) ev._waveform_key = cur_key @@ -574,72 +561,96 @@ class MiniMateClient: "get_events: 0C failed for key=%s: %s", cur_key.hex(), exc ) - # SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the - # device's bulk stream state machine. Cache the returned key as a - # fallback for loop iteration when 5A fails (see iteration block below). - # Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL). - arm_key4: Optional[bytes] = None - try: - arm_key4, _ = proto.advance_event(browse=False) # arm 5A - log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex()) - except ProtocolError as exc: - log.warning("get_events: 1F(download) arm failed: %s", exc) + # ── Skip-5A decision based on (key, timestamp) match ────── + # If skip_waveform_for_events maps cur_key.hex() to a non-empty + # ISO timestamp matching what we just read from 0C, this is + # the same physical event we already have on disk — bypass + # the 1F(arm)+POLL+5A bulk download. Otherwise (no entry, or + # timestamp mismatch indicating post-erase reuse) fall through + # to the full download. + expected_ts = skip_evts.get(cur_key.hex(), "") + actual_ts = _event_timestamp_iso(ev) + skip_5a = bool(expected_ts and actual_ts and expected_ts == actual_ts) + if skip_5a: + log.info( + "get_events: key=%s (key, ts=%s) match — skipping 5A bulk download", + cur_key.hex(), actual_ts, + ) - # POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A. - # Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74). - log.info("get_events: POLL × 3 before 5A") - for _p in range(3): + arm_key4: Optional[bytes] = None + a5_ok = False + + if not skip_5a: + # SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the + # device's bulk stream state machine. Cache the returned key as a + # fallback for loop iteration when 5A fails (see iteration block below). + # Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL). try: - proto.poll() + arm_key4, _ = proto.advance_event(browse=False) # arm 5A + log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex()) except ProtocolError as exc: - log.warning("get_events: POLL %d failed: %s", _p, exc) + log.warning("get_events: 1F(download) arm failed: %s", exc) + + # POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A. + # Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74). + log.info("get_events: POLL × 3 before 5A") + for _p in range(3): + try: + proto.poll() + except ProtocolError as exc: + log.warning("get_events: POLL %d failed: %s", _p, exc) # SUB 5A — bulk waveform stream (uses cur_key, the event set up by 0A+1E+0C). # By default (full_waveform=False): stop after frame 7 for metadata only. # When full_waveform=True: fetch all chunks and decode raw ADC samples. - a5_ok = False - try: - if full_waveform: - log.info( - "get_events: 5A full waveform download for key=%s", cur_key.hex() - ) - a5_frames = proto.read_bulk_waveform_stream( - cur_key, stop_after_metadata=False, max_chunks=128, - include_terminator=True, - ) - if a5_frames: - a5_ok = True - ev._a5_frames = a5_frames # store for write_blastware_file - _decode_a5_metadata_into(a5_frames, ev) - _decode_a5_waveform(a5_frames, ev) + # + # Bypassed when skip_5a is True — the event is left with + # _a5_frames=None, which signals to the caller (e.g. + # ach_server.py) that this event was matched by (key, ts) and + # already has a stored .file in the persistent waveform store. + if not skip_5a: + try: + if full_waveform: log.info( - "get_events: 5A decoded %d sample-sets", - len((ev.raw_samples or {}).get("Tran", [])), + "get_events: 5A full waveform download for key=%s", cur_key.hex() ) - else: - log.info( - "get_events: 5A metadata-only download for key=%s", cur_key.hex() - ) - a5_frames = proto.read_bulk_waveform_stream( - cur_key, stop_after_metadata=True, - include_terminator=True, - extra_chunks_after_metadata=extra_chunks_after_metadata, - max_chunks=128, - ) - if a5_frames: - a5_ok = True - ev._a5_frames = a5_frames # store for write_blastware_file - _decode_a5_metadata_into(a5_frames, ev) - log.debug( - "get_events: 5A metadata client=%r operator=%r", - ev.project_info.client if ev.project_info else None, - ev.project_info.operator if ev.project_info else None, + a5_frames = proto.read_bulk_waveform_stream( + cur_key, stop_after_metadata=False, max_chunks=128, + include_terminator=True, ) - except ProtocolError as exc: - log.warning( - "get_events: 5A failed for key=%s: %s — metadata unavailable", - cur_key.hex(), exc, - ) + if a5_frames: + a5_ok = True + ev._a5_frames = a5_frames # store for write_blastware_file + _decode_a5_metadata_into(a5_frames, ev) + _decode_a5_waveform(a5_frames, ev) + log.info( + "get_events: 5A decoded %d sample-sets", + len((ev.raw_samples or {}).get("Tran", [])), + ) + else: + log.info( + "get_events: 5A metadata-only download for key=%s", cur_key.hex() + ) + a5_frames = proto.read_bulk_waveform_stream( + cur_key, stop_after_metadata=True, + include_terminator=True, + extra_chunks_after_metadata=extra_chunks_after_metadata, + max_chunks=128, + ) + if a5_frames: + a5_ok = True + ev._a5_frames = a5_frames # store for write_blastware_file + _decode_a5_metadata_into(a5_frames, ev) + log.debug( + "get_events: 5A metadata client=%r operator=%r", + ev.project_info.client if ev.project_info else None, + ev.project_info.operator if ev.project_info else None, + ) + except ProtocolError as exc: + log.warning( + "get_events: 5A failed for key=%s: %s — metadata unavailable", + cur_key.hex(), exc, + ) # SUB 1F — loop iteration. # @@ -652,7 +663,14 @@ class MiniMateClient: # Confirmed from 4-3-26 browse-mode captures: browse=True params # are correct for multi-event iteration. Conditional logic added # 2026-04-06 to avoid post-failure state disruption. - if a5_ok: + # + # NEW 2026-05-06: when skip_5a=True we never entered the 5A + # state at all (we read 0A+1E(arm)+0C and chose to bypass). + # 1F(browse) is safe in this scenario — the device's iteration + # pointer is independent of the bulk-stream state machine, and + # we never put it into the half-attempted 5A state that the + # earlier "post-failure 1F disruption" warning is about. + if skip_5a or a5_ok: # 5A succeeded — use browse 1F for reliable key advancement. try: key4, data8 = proto.advance_event(browse=True) @@ -1174,6 +1192,27 @@ class MiniMateClient: # Pure functions: bytes → model field population. # Kept here (not in models.py) to isolate protocol knowledge from data shapes. +def _event_timestamp_iso(event: Event) -> str: + """ + Return a stable ISO-8601 string for the event's 0C-derived timestamp, + or "" if the event has no timestamp populated. + + The format intentionally matches what `bridges/ach_server.py` writes + into `ach_state.json:downloaded_events[*]` so the (key, ts) compare + in get_events()'s skip path is a simple string equality. + """ + ts = getattr(event, "timestamp", None) + if ts is None: + return "" + try: + return datetime.datetime( + ts.year, ts.month, ts.day, + ts.hour or 0, ts.minute or 0, ts.second or 0, + ).isoformat() + except Exception: + return str(ts) + + def _decode_serial_number(data: bytes) -> DeviceInfo: """ Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo. diff --git a/sfm/cache.py b/sfm/cache.py index be35e60..5505bc8 100644 --- a/sfm/cache.py +++ b/sfm/cache.py @@ -83,13 +83,24 @@ class CachedEvent(Base): Events are immutable once recorded on the device; once we have an event in the cache it never needs to be re-downloaded unless explicitly requested. + + The two extra columns `waveform_key` and `event_timestamp` are an + integrity stamp: when set_event() / set_waveform() are called with a + different (waveform_key, event_timestamp) for the same (conn_key, index), + we know the device was erased and re-recorded — the cached row no longer + refers to the same physical event and the entire device's cache is + flushed before the new entry is written. This catches the post-erase + key-reuse bug where the device's first new event (key 01110000) collides + with the first event we previously downloaded. """ __tablename__ = "cached_events" - conn_key = sa.Column(sa.String, primary_key=True) - index = sa.Column(sa.Integer, primary_key=True) - event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict - cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict + cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp + waveform_key = sa.Column(sa.String, nullable=True) # 8-hex device key + event_timestamp = sa.Column(sa.String, nullable=True) # ISO-8601 from 0C class CachedWaveform(Base): @@ -97,14 +108,18 @@ class CachedWaveform(Base): Full raw ADC waveform for a single event (SUB 5A full download). These are large (up to several MB) and expensive to fetch over cellular. - Once downloaded they are immutable and cached permanently. + Once downloaded they are immutable and cached permanently — but the + cache row is invalidated when the device is erased and a new event lands + at the same index (see CachedEvent docstring). """ __tablename__ = "cached_waveforms" - conn_key = sa.Column(sa.String, primary_key=True) - index = sa.Column(sa.Integer, primary_key=True) - waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON - cached_at = sa.Column(sa.Float, nullable=False) + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON + cached_at = sa.Column(sa.Float, nullable=False) + waveform_key = sa.Column(sa.String, nullable=True) # 8-hex device key + event_timestamp = sa.Column(sa.String, nullable=True) # ISO-8601 from 0C class CachedMonitorStatus(Base): @@ -149,6 +164,23 @@ class SFMCache: engine = sa.create_engine(url, connect_args={"check_same_thread": False}) Base.metadata.create_all(engine) self._Session = orm.sessionmaker(bind=engine) + # In-place schema migration: add the (waveform_key, event_timestamp) + # integrity-stamp columns to legacy cache DBs that predate the + # post-erase eviction logic. ALTER TABLE ADD COLUMN is idempotent + # via the column-presence check below. + with engine.begin() as conn: + for table in ("cached_events", "cached_waveforms"): + cols = { + r[1] + for r in conn.exec_driver_sql(f"PRAGMA table_info({table})").fetchall() + } + for new_col, ddl in ( + ("waveform_key", "TEXT"), + ("event_timestamp", "TEXT"), + ): + if new_col not in cols: + log.info("cache schema: %s ADD COLUMN %s %s", table, new_col, ddl) + conn.exec_driver_sql(f"ALTER TABLE {table} ADD COLUMN {new_col} {ddl}") log.info("SFM cache opened: %s", db_path) # ── Connection key ──────────────────────────────────────────────────────── @@ -242,15 +274,91 @@ class SFMCache: row = s.get(CachedEvent, (conn_key, index)) return json.loads(row.event_json) if row else None + @staticmethod + def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]: + """ + Extract the (waveform_key_hex, timestamp_iso) integrity stamp from + a serialised event dict. Either field may be None if the source + Event was missing it; the comparison logic in set_events/set_waveform + treats "both sides have a value AND they differ" as the only + eviction trigger, so partial data never spuriously flushes cache. + """ + key = ev.get("waveform_key") or ev.get("_waveform_key") + if isinstance(key, (bytes, bytearray)): + key = bytes(key).hex() + ts = ev.get("timestamp") + if isinstance(ts, dict): + # _serialise_timestamp returns a dict like {"iso": "...", ...} + ts = ts.get("iso") or ts.get("string") or None + return (key if isinstance(key, str) else None, + ts if isinstance(ts, str) else None) + + def _maybe_flush_on_mismatch( + self, + s, + conn_key: str, + index: int, + new_key: Optional[str], + new_ts: Optional[str], + ) -> bool: + """ + Check whether the cached entry at (conn_key, index) has a different + (waveform_key, timestamp) than the incoming one. If so, treat it as + a post-erase key-reuse signal and flush ALL cached events/waveforms + for this device, then return True. + Returns False when no flush was needed. + """ + if not new_key and not new_ts: + return False # nothing to compare against + existing = s.get(CachedEvent, (conn_key, index)) + if existing is None: + existing = s.get(CachedWaveform, (conn_key, index)) + if existing is None: + return False + old_key = existing.waveform_key + old_ts = existing.event_timestamp + # Only flush when both sides have populated values and they differ. + differs = ( + (new_key and old_key and new_key != old_key) + or (new_ts and old_ts and new_ts != old_ts) + ) + if not differs: + return False + log.warning( + "cache: device %s — index %d (key=%s, ts=%s) replaces (key=%s, ts=%s); " + "flushing all cached events/waveforms for this device " + "(post-erase key reuse detected)", + conn_key, index, new_key, new_ts, old_key, old_ts, + ) + s.query(CachedEvent).filter_by(conn_key=conn_key).delete() + s.query(CachedWaveform).filter_by(conn_key=conn_key).delete() + return True + def set_events(self, conn_key: str, events: list[dict]) -> None: """ Upsert a list of event dicts. Existing rows are updated; new rows are inserted. This is used to add newly-discovered events to the cache. + + Eviction: if any incoming event has a different (waveform_key, + timestamp) than the row currently cached at the same index, we flush + the entire device's cache before inserting the new entries. Catches + post-erase key reuse where index 0 silently switches identity. """ now = time.time() with self._Session() as s: + # Eviction check: scan incoming events for any (index, key, ts) + # that conflicts with a cached row. A single conflict triggers + # a full device-wide flush so we don't end up with a mixed-era + # cache. + for ev in events: + key, ts = self._event_signature(ev) + if self._maybe_flush_on_mismatch(s, conn_key, ev["index"], key, ts): + s.commit() + break # cache is now empty for this device; carry on + for ev in events: idx = ev["index"] + key, ts = self._event_signature(ev) row = s.get(CachedEvent, (conn_key, idx)) if row is None: row = CachedEvent( @@ -258,12 +366,18 @@ class SFMCache: index=idx, event_json=json.dumps(ev), cached_at=now, + waveform_key=key, + event_timestamp=ts, ) s.add(row) log.debug("cached new event %d for %s", idx, conn_key) else: # Refresh in case project_info was backfilled after initial store row.event_json = json.dumps(ev) + if key: + row.waveform_key = key + if ts: + row.event_timestamp = ts s.commit() # ── Waveforms ───────────────────────────────────────────────────────────── @@ -278,8 +392,16 @@ class SFMCache: return json.loads(row.waveform_json) def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: - """Store a full waveform response dict permanently.""" + """ + Store a full waveform response dict permanently. + + Like set_events, this checks the (waveform_key, timestamp) signature + of the incoming entry against what's currently cached at the same + index. A mismatch flushes the entire device's cache before insert. + """ + key, ts = self._event_signature(waveform) with self._Session() as s: + self._maybe_flush_on_mismatch(s, conn_key, index, key, ts) row = s.get(CachedWaveform, (conn_key, index)) if row is None: row = CachedWaveform( @@ -287,13 +409,20 @@ class SFMCache: index=index, waveform_json=json.dumps(waveform), cached_at=time.time(), + waveform_key=key, + event_timestamp=ts, ) s.add(row) else: row.waveform_json = json.dumps(waveform) row.cached_at = time.time() + if key: + row.waveform_key = key + if ts: + row.event_timestamp = ts s.commit() - log.debug("cached waveform for %s event %d", conn_key, index) + log.debug("cached waveform for %s event %d (key=%s, ts=%s)", + conn_key, index, key, ts) # ── Monitor status ──────────────────────────────────────────────────────── diff --git a/sfm/live_cache.py b/sfm/live_cache.py new file mode 100644 index 0000000..9c7cf10 --- /dev/null +++ b/sfm/live_cache.py @@ -0,0 +1,189 @@ +""" +sfm/live_cache.py — Thread-safe in-memory cache for live SFM device data. + +Extracted from sfm/server.py so the cache logic is importable and testable +without pulling in fastapi/uvicorn. + +Caching strategy +---------------- +Keyed by `conn_key` ("tcp:host:port" or "serial:port:baud"). Does NOT +persist across server restarts. + + device_info cached until POST /device/config marks it dirty + events cached by (conn_key, device_event_count); re-fetched when + a quick count_events() probe shows new events on the device + monitor_status 30-second TTL (changes frequently during monitoring) + waveforms permanent within a process — but auto-evicted at the device + level when a (waveform_key, timestamp) mismatch is detected + at the same index (post-erase key reuse — the device's + event-key counter resets to 0x01110000 after every erase, + so the same `(conn_key, index)` slot can refer to a + brand-new physical event). + +All endpoints accept ?force=true to bypass the cache and re-read. +""" + +from __future__ import annotations + +import threading +import time +from typing import Optional + +_MONITOR_STATUS_TTL = 30.0 # seconds + + +class LiveCache: + """ + Thread-safe in-memory cache for live SFM device data. + One singleton per server process. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._device_info: dict[str, dict] = {} + self._events: dict[str, tuple[int, list]] = {} + self._monitor_status: dict[str, tuple[float, dict]] = {} + self._config_dirty: dict[str, bool] = {} + self._waveforms: dict[tuple, dict] = {} + + # ── Connection key ──────────────────────────────────────────────────────── + + @staticmethod + def make_conn_key( + host: Optional[str], + tcp_port: int, + port: Optional[str], + baud: int, + ) -> str: + if host: + return f"tcp:{host}:{tcp_port}" + return f"serial:{port}:{baud}" + + # ── Eviction signature ──────────────────────────────────────────────────── + + @staticmethod + def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]: + """Return (waveform_key_hex, timestamp_iso) from a serialised event.""" + key = ev.get("waveform_key") or ev.get("_waveform_key") + if isinstance(key, (bytes, bytearray)): + key = bytes(key).hex() + ts = ev.get("timestamp") + if isinstance(ts, dict): + ts = ts.get("iso") or ts.get("string") or None + return (key if isinstance(key, str) else None, + ts if isinstance(ts, str) else None) + + def _flush_device(self, conn_key: str) -> None: + """Drop all cached events + waveforms for one device. Caller holds lock.""" + self._events.pop(conn_key, None) + stale_wf_keys = [k for k in self._waveforms if k[0] == conn_key] + for k in stale_wf_keys: + self._waveforms.pop(k, None) + + # ── Device info ─────────────────────────────────────────────────────────── + + def get_device_info(self, conn_key: str) -> Optional[dict]: + with self._lock: + if self._config_dirty.get(conn_key): + return None + return self._device_info.get(conn_key) + + def set_device_info(self, conn_key: str, info: dict) -> None: + with self._lock: + self._device_info[conn_key] = info + self._config_dirty[conn_key] = False + + # ── Events ──────────────────────────────────────────────────────────────── + + def get_events(self, conn_key: str, device_count: int) -> Optional[list]: + with self._lock: + if self._config_dirty.get(conn_key): + return None + entry = self._events.get(conn_key) + if entry is None: + return None + cached_count, events = entry + return events if cached_count == device_count else None + + def set_events(self, conn_key: str, device_count: int, events: list) -> None: + """ + Replace the cached events list for `conn_key`. If any incoming event + has a different (waveform_key, timestamp) than the cached entry at + the same index, flush the entire conn_key's event + waveform cache + first. Catches post-erase key reuse. + """ + with self._lock: + cached_entry = self._events.get(conn_key) + cached_events = cached_entry[1] if cached_entry else [] + cached_by_index = {e.get("index"): e for e in cached_events} + + evict = False + for ev in events: + idx = ev.get("index") + if idx is None: + continue + cached = cached_by_index.get(idx) + if cached is None: + continue + new_key, new_ts = self._event_signature(ev) + old_key, old_ts = self._event_signature(cached) + if (new_key and old_key and new_key != old_key) or \ + (new_ts and old_ts and new_ts != old_ts): + evict = True + break + + if evict: + self._flush_device(conn_key) + + self._events[conn_key] = (device_count, events) + + # ── Monitor status ──────────────────────────────────────────────────────── + + def get_monitor_status(self, conn_key: str) -> Optional[dict]: + with self._lock: + entry = self._monitor_status.get(conn_key) + if entry is None: + return None + fetched_at, status = entry + if time.time() - fetched_at > _MONITOR_STATUS_TTL: + return None + return status + + def set_monitor_status(self, conn_key: str, status: dict) -> None: + with self._lock: + self._monitor_status[conn_key] = (time.time(), status) + + def invalidate_monitor_status(self, conn_key: str) -> None: + with self._lock: + self._monitor_status.pop(conn_key, None) + + # ── Config dirty flag ───────────────────────────────────────────────────── + + def mark_config_dirty(self, conn_key: str) -> None: + with self._lock: + self._config_dirty[conn_key] = True + self._events.pop(conn_key, None) + + # ── Waveforms (permanent cache, evicted on (key,ts) mismatch) ───────────── + + def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: + with self._lock: + return self._waveforms.get((conn_key, index)) + + def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: + """ + Cache a waveform. Evicts the device's whole cache when the existing + entry at the same index has a different (waveform_key, timestamp). + """ + with self._lock: + existing = self._waveforms.get((conn_key, index)) + if existing is not None: + new_key, new_ts = self._event_signature(waveform) + old_key, old_ts = self._event_signature(existing) + differs = ( + (new_key and old_key and new_key != old_key) + or (new_ts and old_ts and new_ts != old_ts) + ) + if differs: + self._flush_device(conn_key) + self._waveforms[(conn_key, index)] = waveform diff --git a/sfm/server.py b/sfm/server.py index 6bb9e37..f6043d9 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -66,6 +66,7 @@ from minimateplus.blastware_file import write_blastware_file, blastware_filename from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform from sfm.cache import SFMCache, get_cache from sfm.database import SeismoDb +from sfm.live_cache import LiveCache as _LiveCache from sfm.waveform_store import WaveformStore logging.basicConfig( @@ -142,116 +143,6 @@ def _get_store() -> WaveformStore: # # All endpoints accept ?force=true to bypass the cache and re-read from device. -_MONITOR_STATUS_TTL = 30.0 # seconds - - -class _LiveCache: - """ - Thread-safe in-memory cache for live SFM device data. - One singleton per server process. - """ - - def __init__(self) -> None: - self._lock = threading.Lock() - # conn_key → serialised device info dict - self._device_info: dict[str, dict] = {} - # conn_key → (device_event_count_when_cached, [event dicts]) - self._events: dict[str, tuple[int, list]] = {} - # conn_key → (fetched_at_unix, status_dict) - self._monitor_status: dict[str, tuple[float, dict]] = {} - # conn_key → bool (True = re-read device on next /device/info) - self._config_dirty: dict[str, bool] = {} - # (conn_key, event_index) → waveform dict (permanent) - self._waveforms: dict[tuple, dict] = {} - - # ── Connection key ──────────────────────────────────────────────────────── - - @staticmethod - def make_conn_key( - host: Optional[str], - tcp_port: int, - port: Optional[str], - baud: int, - ) -> str: - if host: - return f"tcp:{host}:{tcp_port}" - return f"serial:{port}:{baud}" - - # ── Device info ─────────────────────────────────────────────────────────── - - def get_device_info(self, conn_key: str) -> Optional[dict]: - with self._lock: - if self._config_dirty.get(conn_key): - return None - return self._device_info.get(conn_key) - - def set_device_info(self, conn_key: str, info: dict) -> None: - with self._lock: - self._device_info[conn_key] = info - self._config_dirty[conn_key] = False - - # ── Events ──────────────────────────────────────────────────────────────── - - def get_events(self, conn_key: str, device_count: int) -> Optional[list]: - """ - Return cached events if the device's current event count matches what - we had when we last fetched. Returns None (cache miss) otherwise. - """ - with self._lock: - if self._config_dirty.get(conn_key): - return None - entry = self._events.get(conn_key) - if entry is None: - return None - cached_count, events = entry - return events if cached_count == device_count else None - - def set_events(self, conn_key: str, device_count: int, events: list) -> None: - with self._lock: - self._events[conn_key] = (device_count, events) - - # ── Monitor status ──────────────────────────────────────────────────────── - - def get_monitor_status(self, conn_key: str) -> Optional[dict]: - with self._lock: - entry = self._monitor_status.get(conn_key) - if entry is None: - return None - fetched_at, status = entry - if time.time() - fetched_at > _MONITOR_STATUS_TTL: - return None - return status - - def set_monitor_status(self, conn_key: str, status: dict) -> None: - with self._lock: - self._monitor_status[conn_key] = (time.time(), status) - - def invalidate_monitor_status(self, conn_key: str) -> None: - with self._lock: - self._monitor_status.pop(conn_key, None) - - # ── Config dirty flag ───────────────────────────────────────────────────── - - def mark_config_dirty(self, conn_key: str) -> None: - """ - Called after a successful POST /device/config write. - Forces next /device/info and /device/events to re-read from the device. - """ - with self._lock: - self._config_dirty[conn_key] = True - self._events.pop(conn_key, None) - - # ── Waveforms (permanent cache) ─────────────────────────────────────────── - - def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: - with self._lock: - return self._waveforms.get((conn_key, index)) - - def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: - with self._lock: - self._waveforms[(conn_key, index)] = waveform - - _live_cache = _LiveCache() @@ -872,6 +763,7 @@ def device_event_blastware_file( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass any cached/dedup'd state and re-download from device"), ) -> FileResponse: """ Download the waveform for a single event (0-based index) and return it @@ -893,9 +785,13 @@ def device_event_blastware_file( stop_after_index=index) → write_blastware_file() → FileResponse. """ log.info( - "GET /device/event/%d/blastware_file port=%s host=%s", - index, port, host, + "GET /device/event/%d/blastware_file port=%s host=%s force=%s", + index, port, host, force, ) + # `force` always re-downloads from the device. This endpoint already + # never short-circuits via cache, so `force` is reserved for parity with + # the other live endpoints and to suppress the post-download persist + # (see end of handler) when the caller wants a fetch-only escape hatch. try: def _do(): diff --git a/sfm/sfm_webapp.html b/sfm/sfm_webapp.html index a763b39..63f15b8 100644 --- a/sfm/sfm_webapp.html +++ b/sfm/sfm_webapp.html @@ -609,6 +609,36 @@ .section-btn:hover { color: var(--text); } .section-btn.active { background: var(--blue); color: #fff; } + /* ── Force-refresh toggle ── */ + .force-toggle { + display: flex; + align-items: center; + gap: 6px; + padding: 4px 10px; + border: 1px solid var(--border); + border-radius: 6px; + background: var(--bg); + cursor: pointer; + font-size: 11px; + font-weight: 600; + color: var(--text-dim); + user-select: none; + white-space: nowrap; + transition: background 0.12s, color 0.12s, border-color 0.12s; + } + .force-toggle input { margin: 0; cursor: pointer; } + .force-toggle:hover { color: var(--text); } + .force-toggle.active { + background: rgba(248, 81, 73, 0.18); + border-color: #f85149; + color: #ff7b72; + } + .force-toggle .ft-dot { + width: 6px; height: 6px; border-radius: 50%; + background: var(--text-mute); + } + .force-toggle.active .ft-dot { background: #f85149; box-shadow: 0 0 6px #f85149; } + /* ── Section containers ── */ #section-live, #section-db { display: flex; @@ -654,6 +684,13 @@ +
+ +
+
+
+

Event

+ +
+
+
+

Event

+
+
Serial
+
Timestamp
+
Record type
+
Sample rate
+
Waveform key
+
+
+
+

Peaks

+
+
Tran
+
Vert
+
Long
+
PVS
+
Mic
+
+
+
+

Project

+
+
Project
+
Client
+
Operator
+
Location
+
+
+
+

Source / files

+
+
BW filename
+
BW filesize
+
BW sha256
+
Source kind
+
Captured at
+
+
+
+

Review (editable)

+
+ + +
+
+ + +
+ + +
+
+ Raw sidecar JSON (read-only peek) +

+      
+
+ +
+
+ diff --git a/sfm/waveform_store.py b/sfm/waveform_store.py index 83216f8..8d39032 100644 --- a/sfm/waveform_store.py +++ b/sfm/waveform_store.py @@ -1,34 +1,46 @@ """ sfm/waveform_store.py — On-disk store for Blastware-format event files. -Layout (flat per-serial): +Layout (flat per-serial, four files per event): - // ← event file (Blastware-readable binary) + // ← event file (BW-readable binary) //.a5.pkl ← pickled list of A5 S3Frame dicts + //.h5 ← clean waveform arrays (HDF5) + //.sfm.json ← modern sidecar (peaks, project, + review state, extensions) `` is whatever `minimateplus.blastware_file.blastware_filename` -produces for the event. The extension is NOT a fixed type tag — it encodes -the event timestamp (`AB0T` format: 2-char base-36 of `total_seconds % -1296`, literal `0`, then `W`=Full Waveform / `H`=Full Histogram for ACH -downloads, or 3-char `AB0` for direct/manual downloads). Every event's -filename therefore contains its own timestamp + record-type fingerprint and -collisions across the same physical event don't occur. +produces for the event. The extension is NOT a fixed type tag — it +encodes the event timestamp (`AB0T` format). -The `.a5.pkl` sidecar lets the event file be regenerated later if the -encoder changes — captures the raw 5A frame stream as serializable dicts so -the schema isn't tied to the `S3Frame` dataclass layout. +Roles: + - BW binary: what Blastware reads. Untouched. The user-facing review + waveform viewer. + - .a5.pkl: regenerative source. Lets the BW binary be rebuilt + byte-for-byte if the encoder changes. Never delete. + - .h5: clean per-channel waveform arrays in physical units (in/s for + geo, psi for mic) plus event metadata. Canonical format for + downstream analysis tools and the `/device/event/{idx}/waveform` + endpoint's plot-JSON output. + - .sfm.json: small, queryable metadata + review state. SQL + `events.false_trigger` is a derived index kept in sync via + `patch_sidecar()`. """ from __future__ import annotations +import datetime import logging import pickle +import shutil from pathlib import Path from typing import Optional +from minimateplus import event_file_io from minimateplus.blastware_file import blastware_filename, write_blastware_file from minimateplus.framing import S3Frame from minimateplus.models import Event +from sfm import event_hdf5 log = logging.getLogger("sfm.waveform_store") @@ -80,10 +92,22 @@ class WaveformStore: return d def paths_for(self, serial: str, filename: str) -> tuple[Path, Path]: - """Return (blastware_path, a5_pickle_path) for a given serial+filename.""" + """Return (blastware_path, a5_pickle_path) for a given serial+filename. + + For the sidecar path use `sidecar_path_for()` — kept separate so + existing callers don't need to unpack a 3-tuple. + """ d = self._serial_dir(serial) return d / filename, d / f"{filename}.a5.pkl" + def sidecar_path_for(self, serial: str, filename: str) -> Path: + """Return absolute path to the .sfm.json sidecar for a given event.""" + return self._serial_dir(serial) / f"{filename}.sfm.json" + + def hdf5_path_for(self, serial: str, filename: str) -> Path: + """Return absolute path to the .h5 clean-waveform file for a given event.""" + return self._serial_dir(serial) / f"{filename}.h5" + def open_blastware(self, serial: str, filename: str) -> Optional[Path]: """Return absolute path to an existing event file or None.""" bw_path, _ = self.paths_for(serial, filename) @@ -96,23 +120,43 @@ class WaveformStore: ev: Event, serial: str, a5_frames: list[S3Frame], + *, + source_kind: str = "sfm-live", + geo_range = "normal", ) -> dict: """ - Write the event file and its .a5.pkl sidecar for one event. + Write all four event-file artifacts for one event: + - BW binary + - .a5.pkl raw A5 frame pickle + - .h5 clean waveform (HDF5) + - .sfm.json modern sidecar (metadata + review) Returns a record dict suitable for persisting alongside the DB row: { "filename": "M529LKIQ.7M0W", "filesize": 8708, + "sha256": "a1b2c3...", "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl", + "hdf5_filename": "M529LKIQ.7M0W.h5", + "sidecar_filename": "M529LKIQ.7M0W.sfm.json", } - The exact extension is timestamp-encoded per event (see - `minimateplus.blastware_file.blastware_filename`). + `source_kind` flows into `sidecar.source.kind` — callers should + pass "sfm-live" (default) for the live endpoint and "sfm-ach" for + the ACH ingestion path. BW-imported events use save_imported_bw() + instead. - Idempotent: if the event file already exists, it is overwritten with - the freshly-encoded version (same bytes for the same a5_frames). + `geo_range` controls the ADC-counts → in/s scaling in the HDF5 + file ("normal" = 10 in/s FS, "sensitive" = 1.25 in/s FS). + Defaults to "normal" — callers with compliance-config access + should pass the actual unit setting so the saved samples are in + the right units. + + Idempotent: if the event file already exists, it is overwritten + with the freshly-encoded version (same bytes for the same + a5_frames) and the sidecar's review block is preserved across + re-saves. """ if not a5_frames: raise ValueError("WaveformStore.save: a5_frames is empty") @@ -121,17 +165,18 @@ class WaveformStore: filename = blastware_filename(ev, serial) bw_path, a5_path = self.paths_for(serial, filename) + sidecar_path = self.sidecar_path_for(serial, filename) + hdf5_path = self.hdf5_path_for(serial, filename) - # 1. encode the event file - # Delete any stale file at this path so partial writes never leak - # trailing bytes from a previous larger file (matches the live - # endpoint's defensive unlink). + # 1. encode the event file (defensive unlink prevents trailing-byte + # leaks from a previous larger file on synced/odd filesystems). try: bw_path.unlink() except FileNotFoundError: pass write_blastware_file(ev, a5_frames, bw_path) filesize = bw_path.stat().st_size + sha256 = event_file_io.file_sha256(bw_path) # 2. write the .a5.pkl sidecar try: @@ -145,14 +190,176 @@ class WaveformStore: with a5_path.open("wb") as fp: pickle.dump(payload, fp, protocol=pickle.HIGHEST_PROTOCOL) + # 3. write the .h5 clean-waveform file (samples in physical units). + # Best-effort: a write failure shouldn't sink the rest of the save + # (the HDF5 can be regenerated later from the .a5.pkl). + hdf5_filename: Optional[str] = None + try: + event_hdf5.write_event_hdf5( + hdf5_path, ev, + serial=serial, + geo_range=geo_range, + source_kind=source_kind, + ) + hdf5_filename = hdf5_path.name + except Exception as exc: + log.warning( + "save: HDF5 write failed for %s: %s — continuing without .h5", + hdf5_path, exc, + ) + + # 4. write the .sfm.json sidecar. Preserve any existing review + # block + extensions across re-saves so user edits aren't lost + # when the same event is re-downloaded (e.g. via Force refresh). + existing_review = None + existing_extensions = None + if sidecar_path.exists(): + try: + old = event_file_io.read_sidecar(sidecar_path) + existing_review = old.get("review") + existing_extensions = old.get("extensions") + except Exception as exc: + log.warning( + "save: existing sidecar at %s unreadable (%s); overwriting", + sidecar_path, exc, + ) + + sidecar = event_file_io.event_to_sidecar_dict( + ev, + serial=serial, + blastware_filename=filename, + blastware_filesize=filesize, + blastware_sha256=sha256, + source_kind=source_kind, + a5_pickle_filename=a5_path.name, + review=existing_review, + extensions=existing_extensions, + ) + event_file_io.write_sidecar(sidecar_path, sidecar) + log.info( - "WaveformStore.save serial=%s filename=%s filesize=%d frames=%d", + "WaveformStore.save serial=%s filename=%s filesize=%d frames=%d " + "h5=%s sidecar=%s", serial, filename, filesize, len(a5_frames), + hdf5_filename or "(skipped)", sidecar_path.name, ) return { "filename": filename, "filesize": filesize, + "sha256": sha256, "a5_pickle_filename": a5_path.name, + "hdf5_filename": hdf5_filename, + "sidecar_filename": sidecar_path.name, + } + + def save_imported_bw( + self, + bw_bytes: bytes, + source_path: Path, + *, + serial_hint: Optional[str] = None, + ) -> tuple[Event, dict]: + """ + Ingest a Blastware event file produced by an external tool + (Blastware's own ACH, manual download, etc.) where the source A5 + frames aren't available. + + Workflow: + 1. Parse the bytes via event_file_io.read_blastware_file (writes + a temp file to do that, since the parser takes a path). + 2. Resolve serial from BW filename (`

...`) or use + serial_hint. Falls back to "UNKNOWN". + 3. Copy the BW bytes verbatim into //. + 4. Write the .sfm.json sidecar with source.kind = "bw-import" + and a5_pickle_filename = None. Does NOT write a .a5.pkl + (no A5 source available; byte-for-byte regeneration not + possible — the on-disk BW file IS the byte-for-byte source). + + Returns (event, record_dict) so callers can both insert into + SeismoDb and surface the parsed Event. + """ + # Stash the bytes to a temp path so read_blastware_file (path-based) + # can parse without us duplicating its logic. + import tempfile + with tempfile.NamedTemporaryFile(suffix=".bw", delete=False) as tmp: + tmp.write(bw_bytes) + tmp_path = Path(tmp.name) + try: + ev = event_file_io.read_blastware_file(tmp_path) + finally: + try: + tmp_path.unlink() + except FileNotFoundError: + pass + + # Resolve serial. blastware_filename derives a 4-char prefix from + # the numeric serial (e.g. BE11529 → M529); we go the other way + # via the source filename if a hint wasn't given. + serial = serial_hint or _serial_from_bw_filename(source_path.name) or "UNKNOWN" + + # Use the source filename verbatim — it already encodes timestamp + # + record type per BW's AB0T scheme, and we want to preserve it + # so the file BW knows about can be opened back in BW. + filename = source_path.name + bw_path = self._serial_dir(serial) / filename + + # 1. copy bytes + bw_path.write_bytes(bw_bytes) + filesize = bw_path.stat().st_size + sha256 = event_file_io.file_sha256(bw_path) + + # 2. write the .h5 clean-waveform file from the parsed Event. + # Note: peaks here are computed from raw samples (the BW file + # doesn't carry the device-authoritative 0C peaks). Best-effort. + hdf5_path = self.hdf5_path_for(serial, filename) + hdf5_filename: Optional[str] = None + try: + event_hdf5.write_event_hdf5( + hdf5_path, ev, + serial=serial, + geo_range="normal", # BW file doesn't carry the range; assume Normal + source_kind="bw-import", + ) + hdf5_filename = hdf5_path.name + except Exception as exc: + log.warning( + "save_imported_bw: HDF5 write failed for %s: %s — continuing", + hdf5_path, exc, + ) + + # 3. write sidecar with source.kind = bw-import + sidecar_path = self.sidecar_path_for(serial, filename) + existing_review = None + if sidecar_path.exists(): + try: + existing_review = event_file_io.read_sidecar(sidecar_path).get("review") + except Exception: + pass + + sidecar = event_file_io.event_to_sidecar_dict( + ev, + serial=serial, + blastware_filename=filename, + blastware_filesize=filesize, + blastware_sha256=sha256, + source_kind="bw-import", + a5_pickle_filename=None, + review=existing_review, + ) + event_file_io.write_sidecar(sidecar_path, sidecar) + + log.info( + "WaveformStore.save_imported_bw serial=%s filename=%s filesize=%d " + "h5=%s (no .a5.pkl — A5 source unavailable for BW-imported files)", + serial, filename, filesize, hdf5_filename or "(skipped)", + ) + return ev, { + "filename": filename, + "filesize": filesize, + "sha256": sha256, + "a5_pickle_filename": None, + "hdf5_filename": hdf5_filename, + "sidecar_filename": sidecar_path.name, } def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]: @@ -169,3 +376,71 @@ class WaveformStore: log.warning("WaveformStore.load_a5: malformed sidecar at %s", a5_path) return None return [_dict_to_frame(d) for d in payload["frames"]] + + # ── modern .sfm.json sidecar accessors ────────────────────────────────────── + + def load_sidecar(self, serial: str, filename: str) -> Optional[dict]: + """Return the parsed .sfm.json sidecar dict, or None if missing.""" + path = self.sidecar_path_for(serial, filename) + if not path.exists(): + return None + try: + return event_file_io.read_sidecar(path) + except Exception as exc: + log.warning("load_sidecar: failed to read %s: %s", path, exc) + return None + + def patch_sidecar( + self, + serial: str, + filename: str, + *, + review: Optional[dict] = None, + extensions: Optional[dict] = None, + reviewer_now: bool = True, + ) -> Optional[dict]: + """ + JSON-merge-patch the .sfm.json sidecar's review/extensions blocks. + Returns the new full dict, or None if the sidecar doesn't exist. + """ + path = self.sidecar_path_for(serial, filename) + if not path.exists(): + return None + return event_file_io.patch_sidecar( + path, + review=review, + extensions=extensions, + reviewer_now=reviewer_now, + ) + + +# ── helpers ───────────────────────────────────────────────────────────────────── + +def _serial_from_bw_filename(name: str) -> Optional[str]: + """ + Reverse of `blastware_filename`'s serial-prefix encoding. + + BW filename format (V10.72): `

.` + where P = chr(ord('B') + floor(serial // 1000)) + and serial3 = f"{serial % 1000:03d}". + + Examples (from CLAUDE.md verification archive): + P036... → BE14036 H907... → BE6907 + M529... → BE11529 T003... → BE18003 + + Returns the inferred BE-prefix serial (e.g. "BE11529") or None when + the filename doesn't match the expected pattern. + """ + if not name: + return None + # First letter encodes the thousands group; next 3 chars encode the + # last 3 digits of the serial. + base = name.split(".", 1)[0] + if len(base) < 4 or not base[0].isalpha() or not base[1:4].isdigit(): + return None + prefix_letter = base[0].upper() + if prefix_letter < "B": + return None + thousands = ord(prefix_letter) - ord("B") + serial_num = thousands * 1000 + int(base[1:4]) + return f"BE{serial_num}" diff --git a/tests/test_event_file_io.py b/tests/test_event_file_io.py new file mode 100644 index 0000000..806d64f --- /dev/null +++ b/tests/test_event_file_io.py @@ -0,0 +1,348 @@ +""" +test_event_file_io.py — sidecar write/read/patch round-trips, +WaveformStore sidecar integration, and the BW-import path. + +Run: + python tests/test_event_file_io.py +""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +from pathlib import Path + +try: + import pytest +except ImportError: + pytest = None # type: ignore + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from minimateplus import event_file_io +from minimateplus.framing import S3Frame +from minimateplus.models import Event, Timestamp + + +# ── Fixtures shared with test_waveform_store.py ─────────────────────────────── + + +def _make_synthetic_event() -> tuple[Event, list[S3Frame]]: + """Same shape as tests/test_waveform_store.py — minimum viable Event + + A5 stream that makes write_blastware_file emit a parseable file. + + STRT is exactly 21 bytes; rectime_seconds lands at byte 18 to match + `_decode_a5_waveform`'s expected layout (which is also what + `read_blastware_file()` reads back).""" + key4 = bytes.fromhex("01110000") + rectime = 3 + strt = bytearray(21) + strt[0:4] = b"STRT" + strt[4:6] = b"\xff\xfe" + strt[6:10] = key4 # end_key (per data[23:27] in CLAUDE.md) + strt[10:14] = key4 # start_key (per data[27:31]) + strt[18] = rectime + strt = bytes(strt) + + probe_data = bytes(7) + strt + bytes(32) + probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, data=probe_data, + checksum_valid=True, chk_byte=0x00) + + sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10, + data=bytes(7) + bytes(0x0200), checksum_valid=True, + chk_byte=0x00) + + # Build a valid 26-byte footer (0e 08 + ts1 + ts2 + 6 const + 2 crc) + # and embed it at the END of the terminator's contribution so + # write_blastware_file finds the real `0e 08` marker rather than + # falling back to slicing the last 26 bytes of zero garbage. + # ts byte order: [day][month][year_HI][year_LO][0x00][hour][min][sec] + footer = ( + b"\x0e\x08" + + bytes([6, 5, 0x07, 0xea, 0, 12, 34, 56]) # ts1 = 2026-05-06 12:34:56 + + bytes([6, 5, 0x07, 0xea, 0, 12, 35, 6]) # ts2 = ts1 + ~10s + + b"\x00\x01\x00\x02\x00\x00" + + b"\x00\x00" + ) + assert len(footer) == 26 + term_data = bytes(11) + bytes(38) + footer # 11 prefix + 38 pad + 26 footer = 75 + term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00, + data=term_data, checksum_valid=True, chk_byte=0x00) + + ev = Event(index=0) + ev._waveform_key = key4 + ev.timestamp = Timestamp( + raw=b"", flag=0x10, year=2026, unknown_byte=0, + month=5, day=6, hour=12, minute=34, second=56, + ) + ev.rectime_seconds = rectime + ev.record_type = "Waveform" + ev._a5_frames = [probe, sample, term] + return ev, [probe, sample, term] + + +# ── Sidecar write/read round-trip ───────────────────────────────────────────── + + +def test_event_to_sidecar_dict_shape(): + ev, _ = _make_synthetic_event() + d = event_file_io.event_to_sidecar_dict( + ev, + serial="BE11529", + blastware_filename="M529LKIQ.7M0W", + blastware_filesize=1024, + blastware_sha256="abcd" * 16, + source_kind="sfm-live", + a5_pickle_filename="M529LKIQ.7M0W.a5.pkl", + ) + + assert d["schema_version"] == event_file_io.SCHEMA_VERSION + assert d["kind"] == event_file_io.SIDECAR_KIND + assert d["event"]["serial"] == "BE11529" + assert d["event"]["timestamp"] == "2026-05-06T12:34:56" + assert d["event"]["waveform_key"] == "01110000" + assert d["blastware"]["sha256"] == "abcd" * 16 + assert d["source"]["kind"] == "sfm-live" + assert d["review"] == { + "false_trigger": False, "reviewer": None, + "reviewed_at": None, "notes": "", + } + assert d["extensions"] == {} + + +def test_sidecar_write_and_read_round_trip(tmp_path: Path): + ev, _ = _make_synthetic_event() + path = tmp_path / "M529LKIQ.7M0W.sfm.json" + src = event_file_io.event_to_sidecar_dict( + ev, serial="BE11529", + blastware_filename="M529LKIQ.7M0W", blastware_filesize=1024, + blastware_sha256="x" * 64, source_kind="sfm-ach", + ) + event_file_io.write_sidecar(path, src) + loaded = event_file_io.read_sidecar(path) + assert loaded["event"] == src["event"] + assert loaded["blastware"] == src["blastware"] + assert loaded["source"]["kind"] == "sfm-ach" + + +def test_sidecar_rejects_unsupported_schema_version(tmp_path: Path): + path = tmp_path / "future.sfm.json" + path.write_text(json.dumps({ + "schema_version": event_file_io.SCHEMA_VERSION + 1, + "kind": event_file_io.SIDECAR_KIND, + })) + try: + event_file_io.read_sidecar(path) + except ValueError as exc: + assert "schema_version" in str(exc) + return + raise AssertionError("read_sidecar should have rejected unsupported version") + + +def test_sidecar_extensions_survive_round_trip(tmp_path: Path): + """Forward-compat: unknown keys inside `extensions` survive a r/w cycle.""" + ev, _ = _make_synthetic_event() + path = tmp_path / "x.sfm.json" + d = event_file_io.event_to_sidecar_dict( + ev, serial="BE11529", + blastware_filename="X", blastware_filesize=0, blastware_sha256="", + source_kind="sfm-live", + extensions={"vendor.acme.gps": {"lat": 40.7, "lon": -74.0}}, + ) + event_file_io.write_sidecar(path, d) + back = event_file_io.read_sidecar(path) + assert back["extensions"]["vendor.acme.gps"]["lat"] == 40.7 + + +def test_sidecar_patch_review_stamps_reviewed_at(tmp_path: Path): + ev, _ = _make_synthetic_event() + path = tmp_path / "patch.sfm.json" + event_file_io.write_sidecar( + path, + event_file_io.event_to_sidecar_dict( + ev, serial="BE11529", + blastware_filename="X", blastware_filesize=0, blastware_sha256="", + source_kind="sfm-live", + ), + ) + new = event_file_io.patch_sidecar( + path, + review={"false_trigger": True, "notes": "truck thump", "reviewer": "brian"}, + ) + assert new["review"]["false_trigger"] is True + assert new["review"]["notes"] == "truck thump" + assert new["review"]["reviewer"] == "brian" + assert new["review"]["reviewed_at"], "reviewed_at must be auto-stamped" + + on_disk = event_file_io.read_sidecar(path) + assert on_disk["review"]["false_trigger"] is True + + +# ── WaveformStore integration ───────────────────────────────────────────────── + + +def test_waveform_store_save_writes_sidecar(tmp_path: Path): + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + rec = store.save(ev, serial="BE11529", a5_frames=frames, source_kind="sfm-live") + + assert rec["sidecar_filename"].endswith(".sfm.json") + assert rec["sha256"] and len(rec["sha256"]) == 64 + + sc = store.load_sidecar("BE11529", rec["filename"]) + assert sc is not None + assert sc["blastware"]["filename"] == rec["filename"] + assert sc["blastware"]["sha256"] == rec["sha256"] + assert sc["source"]["kind"] == "sfm-live" + # The .a5.pkl reference should match the actual filename on disk. + assert sc["source"]["a5_pickle_filename"] == rec["a5_pickle_filename"] + + +def test_waveform_store_save_preserves_review_across_resave(tmp_path: Path): + """Re-saving the same event must preserve a user's prior review edits.""" + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event() + rec = store.save(ev, serial="BE11529", a5_frames=frames) + + # User flips false_trigger and adds a note. + store.patch_sidecar( + "BE11529", rec["filename"], + review={"false_trigger": True, "notes": "hello"}, + ) + + # A second save (e.g. Force refresh re-download) must keep those edits. + store.save(ev, serial="BE11529", a5_frames=frames) + sc = store.load_sidecar("BE11529", rec["filename"]) + assert sc["review"]["false_trigger"] is True + assert sc["review"]["notes"] == "hello" + + +def test_waveform_store_patch_sidecar_returns_none_when_missing(tmp_path: Path): + from sfm.waveform_store import WaveformStore + + store = WaveformStore(tmp_path / "waveforms") + out = store.patch_sidecar("BE99999", "no.such.W", review={"notes": "x"}) + assert out is None + + +# ── DB integration: sidecar_filename column + update_event_review ───────────── + + +def test_seismodb_persists_sidecar_filename_and_review_sync(tmp_path: Path): + from sfm.database import SeismoDb + + db = SeismoDb(tmp_path / "seismo_relay.db") + ev, _ = _make_synthetic_event() + + rec = { + "filename": "M529LKIQ.7M0W", + "filesize": 8708, + "a5_pickle_filename": "M529LKIQ.7M0W.a5.pkl", + "sidecar_filename": "M529LKIQ.7M0W.sfm.json", + } + inserted, _ = db.insert_events( + [ev], serial="BE11529", + waveform_records={ev._waveform_key.hex(): rec}, + ) + assert inserted == 1 + + rows = db.query_events(serial="BE11529") + row = rows[0] + assert row["sidecar_filename"] == rec["sidecar_filename"] + + # update_event_review keeps false_trigger column in sync with sidecar. + assert db.update_event_review(row["id"], {"false_trigger": True}) is True + again = db.get_event(row["id"]) + assert again["false_trigger"] == 1 + + # Empty review block (no false_trigger key) → no-op but row exists. + assert db.update_event_review(row["id"], {"notes": "x"}) is True + + +# ── BW-file reader (read_blastware_file) ───────────────────────────────────── + + +def test_read_blastware_file_round_trip(tmp_path: Path): + """write → read → key/timestamp/rectime survive.""" + from minimateplus.blastware_file import write_blastware_file, blastware_filename + + ev, frames = _make_synthetic_event() + bw_path = tmp_path / blastware_filename(ev, "BE11529") + write_blastware_file(ev, frames, bw_path) + + parsed = event_file_io.read_blastware_file(bw_path) + assert parsed._waveform_key == ev._waveform_key + assert parsed.rectime_seconds == ev.rectime_seconds + # Timestamp lands via the footer; year/month/day/hour/min/sec all survive. + assert parsed.timestamp is not None + assert parsed.timestamp.year == ev.timestamp.year + assert parsed.timestamp.month == ev.timestamp.month + assert parsed.timestamp.day == ev.timestamp.day + assert parsed.timestamp.hour == ev.timestamp.hour + assert parsed.timestamp.minute == ev.timestamp.minute + assert parsed.timestamp.second == ev.timestamp.second + # No A5 source recoverable. + assert parsed._a5_frames is None + # Peaks computed from samples (synthetic = zero samples → zero peaks). + assert parsed.peak_values is not None + assert parsed.peak_values.peak_vector_sum == 0.0 + + +def test_save_imported_bw_round_trip(tmp_path: Path): + """save_imported_bw stores a copy + sidecar with source.kind = bw-import.""" + from minimateplus.blastware_file import write_blastware_file, blastware_filename + from sfm.waveform_store import WaveformStore + + # Produce a BW file outside the store. + ev, frames = _make_synthetic_event() + fname = blastware_filename(ev, "BE11529") + src = tmp_path / fname + write_blastware_file(ev, frames, src) + + store = WaveformStore(tmp_path / "waveforms") + parsed_ev, rec = store.save_imported_bw(src.read_bytes(), source_path=src) + + assert rec["filename"] == fname + assert rec["a5_pickle_filename"] is None # no A5 source for BW imports + sc = store.load_sidecar("BE11529", fname) + assert sc is not None + assert sc["source"]["kind"] == "bw-import" + assert sc["source"]["a5_pickle_filename"] is None + # The stored binary should match the source byte-for-byte (we just copied). + stored_path = store.open_blastware("BE11529", fname) + assert stored_path is not None + assert stored_path.read_bytes() == src.read_bytes() + + +if __name__ == "__main__": + if pytest is not None: + pytest.main([__file__, "-v"]) + else: + import inspect + import traceback as _tb + + passed = failed = 0 + for _name, _fn in sorted(globals().items()): + if not _name.startswith("test_") or not callable(_fn): + continue + try: + _sig = inspect.signature(_fn) + if "tmp_path" in _sig.parameters: + with tempfile.TemporaryDirectory() as _td: + _fn(Path(_td)) + else: + _fn() + print(f"PASS {_name}") + passed += 1 + except Exception: + print(f"FAIL {_name}") + _tb.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + sys.exit(0 if failed == 0 else 1) diff --git a/tests/test_event_hdf5.py b/tests/test_event_hdf5.py new file mode 100644 index 0000000..86c3336 --- /dev/null +++ b/tests/test_event_hdf5.py @@ -0,0 +1,296 @@ +""" +test_event_hdf5.py — HDF5 codec round-trip + plot.v1 JSON shape sanity. + +Run: + python tests/test_event_hdf5.py +""" + +from __future__ import annotations + +import os +import sys +import tempfile +from pathlib import Path + +try: + import pytest +except ImportError: + pytest = None # type: ignore + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from minimateplus.framing import S3Frame +from minimateplus.models import Event, PeakValues, ProjectInfo, Timestamp +from sfm import event_hdf5 + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +def _make_event_with_samples(n: int = 256) -> Event: + """An Event with synthetic int16 ADC samples on all four channels. + + Channel content: + - Tran: ramp from -16384 to +16383 (peak ≈ 5 in/s for Normal range) + - Vert: full-scale dirac at index n//2 (peak = 10 in/s) + - Long: zeros + - MicL: small ramp + Peak values are set on the event the way the device's 0C record + would supply them — used by the HDF5 writer for the mic per-count + factor. + """ + tran = [int((i / max(n - 1, 1)) * 32767 - 16384) for i in range(n)] + vert = [0] * n + if n: + vert[n // 2] = 32767 + long_ = [0] * n + mic = [int((i / max(n - 1, 1)) * 5000) for i in range(n)] + + ev = Event(index=0) + ev._waveform_key = bytes.fromhex("01110000") + ev.timestamp = Timestamp( + raw=b"", flag=0x10, + year=2026, unknown_byte=0, month=5, day=7, + hour=10, minute=0, second=0, + ) + ev.record_type = "Waveform" + ev.sample_rate = 1024 + ev.pretrig_samples = n // 4 + ev.total_samples = n + ev.rectime_seconds = n / 1024.0 + ev.raw_samples = {"Tran": tran, "Vert": vert, "Long": long_, "MicL": mic} + ev.peak_values = PeakValues( + tran=5.0, vert=10.0, long=0.0, + peak_vector_sum=10.0, micl=0.001, + ) + ev.project_info = ProjectInfo( + project="TestProj", client="TestClient", + operator="brian", sensor_location="loc-A", + ) + return ev + + +# ── HDF5 round-trip ─────────────────────────────────────────────────────────── + + +def test_hdf5_round_trip_preserves_metadata(tmp_path: Path): + ev = _make_event_with_samples() + h5 = tmp_path / "test.h5" + event_hdf5.write_event_hdf5( + h5, ev, serial="BE11529", geo_range="normal", + ) + + data = event_hdf5.read_event_hdf5(h5) + a = data["attrs"] + assert a["schema_version"] == event_hdf5.SCHEMA_VERSION + assert a["kind"] == event_hdf5.HDF5_KIND + assert a["serial"] == "BE11529" + assert a["waveform_key"] == "01110000" + assert a["sample_rate"] == 1024 + assert a["pretrig_samples"] == 64 + assert a["geo_range"] == "normal" + assert a["geo_full_scale_ips"] == 10.0 + assert a["project"] == "TestProj" + assert a["client"] == "TestClient" + assert a["operator"] == "brian" + # Float attrs may round-trip with tiny precision noise. + assert abs(a["peak_tran_ips"] - 5.0) < 1e-6 + assert abs(a["peak_vert_ips"] - 10.0) < 1e-6 + + +def test_hdf5_samples_in_physical_units_normal_range(tmp_path: Path): + """Vert hits ADC full-scale (32767) → with Normal range FS=10 in/s, + the HDF5 sample value should be ≈ 10 * 32767/32768 in/s.""" + ev = _make_event_with_samples() + h5 = tmp_path / "n.h5" + event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal") + data = event_hdf5.read_event_hdf5(h5) + + vert = data["samples"]["Vert"] + assert vert.dtype.name == "float32" + assert max(abs(v) for v in vert) > 9.99 # full-scale ≈ 10.0 + # The dirac was at n//2 → 32767 ADC counts. + expected_peak = 10.0 * 32767 / 32768 + assert abs(max(vert) - expected_peak) < 1e-3 + + +def test_hdf5_samples_in_physical_units_sensitive_range(tmp_path: Path): + """Same fixture but Sensitive range → full-scale 1.250 in/s.""" + ev = _make_event_with_samples() + h5 = tmp_path / "s.h5" + event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="sensitive") + data = event_hdf5.read_event_hdf5(h5) + + vert = data["samples"]["Vert"] + expected_peak = 1.250 * 32767 / 32768 + assert abs(max(vert) - expected_peak) < 1e-4 + + +def test_hdf5_includes_int16_samples(tmp_path: Path): + ev = _make_event_with_samples() + h5 = tmp_path / "i.h5" + event_hdf5.write_event_hdf5(h5, ev, serial="BE11529") + data = event_hdf5.read_event_hdf5(h5) + assert data["samples_int16"] is not None + assert "Tran" in data["samples_int16"] + assert data["samples_int16"]["Vert"].dtype.name == "int16" + + +def test_hdf5_rejects_unsupported_schema(tmp_path: Path): + """Round-tripping with a tampered schema_version raises ValueError.""" + import h5py + h5 = tmp_path / "future.h5" + with h5py.File(h5, "w") as f: + f.attrs["schema_version"] = 99 + f.attrs["kind"] = event_hdf5.HDF5_KIND + try: + event_hdf5.read_event_hdf5(h5) + except ValueError as exc: + assert "schema_version" in str(exc) + return + raise AssertionError("read_event_hdf5 should reject unsupported schema_version") + + +# ── plot.v1 JSON shape ──────────────────────────────────────────────────────── + + +def test_event_to_plot_json_shape(): + ev = _make_event_with_samples() + j = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal") + assert j["schema"] == "sfm.plot.v1" + assert j["serial"] == "BE11529" + assert j["geo_range"] == "normal" + assert j["geo_full_scale_ips"] == 10.0 + assert j["trigger_ms"] == 0.0 + + t = j["time_axis"] + assert t["sample_rate"] == 1024 + assert t["pretrig_samples"] == 64 + assert t["n_samples"] == 256 + # t0_ms = -pretrig * dt_ms = -64 * (1000/1024) ≈ -62.5 + assert abs(t["t0_ms"] - (-64 * 1000 / 1024)) < 1e-3 + assert abs(t["dt_ms"] - (1000 / 1024)) < 1e-6 + + chans = j["channels"] + for name in ("Tran", "Vert", "Long", "MicL"): + assert name in chans, f"missing channel: {name}" + assert chans[name]["unit"] in ("in/s", "psi") + assert "values" in chans[name] + assert "peak" in chans[name] + assert "peak_t_ms" in chans[name] + + # Values are in physical units: Vert peak ≈ 10 in/s. + assert max(chans["Vert"]["values"]) > 9.99 + + +def test_event_to_plot_json_peak_t_ms_locates_dirac(): + """The Vert channel's full-scale dirac at sample n//2 should produce + peak_t_ms = (n//2 - pretrig) * dt_ms.""" + ev = _make_event_with_samples(n=256) + j = event_hdf5.event_to_plot_json(ev, serial="BE11529") + expected = (128 - 64) * (1000 / 1024) # = 62.5 ms + assert abs(j["channels"]["Vert"]["peak_t_ms"] - expected) < 1e-2 + + +def test_plot_json_from_hdf5_round_trip(tmp_path: Path): + """plot_json_from_hdf5 produces the same shape as event_to_plot_json.""" + ev = _make_event_with_samples() + h5 = tmp_path / "rt.h5" + event_hdf5.write_event_hdf5(h5, ev, serial="BE11529", geo_range="normal") + + j_disk = event_hdf5.plot_json_from_hdf5(h5, event_id="abc-123") + j_mem = event_hdf5.event_to_plot_json(ev, serial="BE11529", geo_range="normal", event_id="abc-123") + + # Top-level shape parity + for k in ("schema", "serial", "geo_range", "geo_full_scale_ips", + "trigger_ms", "record_type", "waveform_key", "event_id"): + assert j_disk.get(k) == j_mem.get(k), f"mismatch on {k}" + assert j_disk["time_axis"]["sample_rate"] == j_mem["time_axis"]["sample_rate"] + assert j_disk["time_axis"]["n_samples"] == j_mem["time_axis"]["n_samples"] + + # Sample values must match within float32 precision. + for ch in ("Tran", "Vert", "Long", "MicL"): + a = j_disk["channels"][ch]["values"] + b = j_mem["channels"][ch]["values"] + assert len(a) == len(b) + if a: + mx = max(abs(x - y) for x, y in zip(a, b)) + assert mx < 1e-3, f"{ch}: max diff {mx}" + + +# ── WaveformStore integration with HDF5 ─────────────────────────────────────── + + +def _make_synthetic_event_for_save() -> tuple[Event, list[S3Frame]]: + """Same flavour as test_event_file_io.py but ensures _make_event_with_samples + is also wired into the BW write path so we can exercise WaveformStore.save.""" + ev = _make_event_with_samples(n=128) + # Build a minimum 3-frame A5 stream (probe + sample + term) — same + # shape used in the other test files. The encoder only really needs + # the STRT in the probe + a non-zero body and a footer in the term. + key4 = ev._waveform_key + rectime = int(ev.rectime_seconds or 0) or 1 + strt = bytearray(21) + strt[0:4] = b"STRT" + strt[4:6] = b"\xff\xfe" + strt[6:10] = key4 + strt[10:14] = key4 + strt[18] = rectime + probe = S3Frame(sub=0xA5, page_hi=0x10, page_lo=0x00, + data=bytes(7) + bytes(strt) + bytes(32), + checksum_valid=True, chk_byte=0x00) + sample = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x10, + data=bytes(7) + bytes(0x0200), checksum_valid=True, chk_byte=0x00) + footer = ( + b"\x0e\x08" + + bytes([7, 5, 0x07, 0xea, 0, 10, 0, 0]) + + bytes([7, 5, 0x07, 0xea, 0, 10, 0, 1]) + + b"\x00\x01\x00\x02\x00\x00\x00\x00" + ) + term = S3Frame(sub=0xA5, page_hi=0x00, page_lo=0x00, + data=bytes(11) + bytes(38) + footer, checksum_valid=True, chk_byte=0x00) + ev._a5_frames = [probe, sample, term] + return ev, [probe, sample, term] + + +def test_waveform_store_save_emits_hdf5(tmp_path: Path): + from sfm.waveform_store import WaveformStore + store = WaveformStore(tmp_path / "waveforms") + ev, frames = _make_synthetic_event_for_save() + rec = store.save(ev, serial="BE11529", a5_frames=frames, geo_range="normal") + + assert rec["hdf5_filename"], "hdf5_filename should be present in save() record" + h5 = store.hdf5_path_for("BE11529", rec["filename"]) + assert h5.exists(), "WaveformStore.save should produce a .h5 file" + # The HDF5 round-trip should match the event's metadata. + data = event_hdf5.read_event_hdf5(h5) + assert data["attrs"]["serial"] == "BE11529" + assert data["attrs"]["geo_range"] == "normal" + + +if __name__ == "__main__": + if pytest is not None: + pytest.main([__file__, "-v"]) + else: + import inspect + import traceback as _tb + + passed = failed = 0 + for _name, _fn in sorted(globals().items()): + if not _name.startswith("test_") or not callable(_fn): + continue + try: + _sig = inspect.signature(_fn) + if "tmp_path" in _sig.parameters: + with tempfile.TemporaryDirectory() as _td: + _fn(Path(_td)) + else: + _fn() + print(f"PASS {_name}") + passed += 1 + except Exception: + print(f"FAIL {_name}") + _tb.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + sys.exit(0 if failed == 0 else 1) From bbed85f7e26d149e98a08488891190bff33bfb51 Mon Sep 17 00:00:00 2001 From: serversdown Date: Fri, 8 May 2026 18:48:06 +0000 Subject: [PATCH 5/5] fix: update channel keys to include 'MicL' in device_event_waveform documentation --- minimateplus/client.py | 111 +++++++++++++++++++++++++++++------------ sfm/server.py | 2 +- 2 files changed, 79 insertions(+), 34 deletions(-) diff --git a/minimateplus/client.py b/minimateplus/client.py index 43eeb57..048ddac 100644 --- a/minimateplus/client.py +++ b/minimateplus/client.py @@ -1572,59 +1572,106 @@ def _decode_a5_waveform( # STRT byte layout (21 bytes; verified against M529LIY6 reference files # and re-confirmed against live BE11529 captures, 2026-05-08): # [0:4] b'STRT' - # [4:6] 0xff 0xfe fixed - # [6:10] end_key (4-byte device flash address where event ends) - # [10:14] start_key (4-byte device flash address where event starts) - # [14:18] device-specific (4 bytes; semantics not pinned) - # [18] 0x46 record-type marker (= 70 in decimal — NOT rectime!) + # [4:6] 0xff 0xfe sentinel + # [6:10] end_key 4-byte BE flash address where event ends + # [10:14] start_key 4-byte BE flash address where event starts + # [14:18] device-specific (semantics not pinned; values vary across events + # and don't hold authoritative total_samples / pretrig) + # [18] 0x46 record-type marker (NOT rectime) # [19] device-specific - # [20] rectime (uint8 seconds, user-set Record Time) + # [20] sometimes rectime, sometimes 0 — not reliable # - # The earlier reading of `rectime_seconds = strt[18]` always returned - # 70 for a real waveform event because it was reading the 0x46 marker. - # Caller should prefer compliance_config.record_time when available - # (that's the authoritative user-set value) and fall back to this. - total_samples = struct.unpack_from(">H", strt, 8)[0] - pretrig_samples = struct.unpack_from(">H", strt, 16)[0] - rectime_seconds = strt[20] + # AUTHORITATIVE values must come from compliance_config (sample_rate, + # record_time) and from end_offset - start_offset arithmetic (event size). + # Earlier code claimed STRT[8:10]=total_samples and STRT[16:18]=pretrig; + # those positions actually overlap end_key low-word and dev-specific bytes + # respectively. We surface the address-derived event size so consumers + # can sanity-check chunk-loop bounds, but `total_samples` per channel must + # be derived externally (sample_rate × record_time, or computed from the + # decoded sample count below). + end_key = strt[6:10] + start_key = strt[10:14] + end_offset_in_strt = (end_key[2] << 8) | end_key[3] + start_offset_in_strt = (start_key[2] << 8) | start_key[3] + is_event_1 = (start_offset_in_strt == 0x0000) - event.total_samples = total_samples - event.pretrig_samples = pretrig_samples - event.rectime_seconds = rectime_seconds + # Don't trust STRT for these — leave them as None so the caller can + # backfill from compliance_config (the authoritative source). + event.total_samples = None + event.pretrig_samples = None + event.rectime_seconds = None log.debug( - "_decode_a5_waveform: STRT total_samples=%d pretrig=%d rectime=%ds " - "(strt[18]=0x%02X record-type marker, strt[20]=0x%02X rectime)", - total_samples, pretrig_samples, rectime_seconds, strt[18], strt[20], + "_decode_a5_waveform: STRT start_key=%s end_key=%s " + "start_off=0x%04X end_off=0x%04X is_event_1=%s " + "dev-specific[14:18]=%s strt[20]=0x%02X", + start_key.hex(), end_key.hex(), + start_offset_in_strt, end_offset_in_strt, is_event_1, + strt[14:18].hex(), strt[20], ) # ── Collect per-frame waveform bytes with global offset tracking ───────── # global_offset is the cumulative byte count across all frames, used to # compute the channel alignment at each frame boundary. + # + # Frame layout under the v0.14.0+ walk: + # frames_data[0] = probe response (page_addr 0x0000; + # contains STRT + post-STRT data) + # frames_data[1..2] = (event 1 only) metadata pages + # page_addr = 0x1002 / 0x1004 + # frames_data[mid] = sample chunks at flash addresses + # 0x0600, 0x0800, … (page_addr in + # {0x0600..0x1FFE}) + # frames_data[last] = TERM response (page_key=0x0000) + # + # We identify metadata pages by their PAGE ADDRESS at db.data[4:6] (the + # 2-byte counter the device echoes back), NOT by content scan. An earlier + # needle-based detection (b"Project:", b"Client:", etc.) was the wrong + # layer of abstraction: + # • The actual metadata pages 0x1002 / 0x1004 do NOT contain ASCII + # project strings on this firmware (S338.17 / BE11529). + # • The strings physically live at flash address 0x1600 — which falls + # inside the sample-chunk address range. Skipping that frame would + # drop a real sample chunk. + # BW handles the "samples region happens to contain string bytes" case + # by just rendering the bytes verbatim; we do the same. + _METADATA_PAGES = (b"\x10\x02", b"\x10\x04") + chunks: list[tuple[int, bytes]] = [] # (frame_idx, waveform_bytes) global_offset = 0 for fi, db in enumerate(frames_data): + page_addr = db.data[4:6] if len(db.data) >= 6 else b"" w = db.data[7:] # frame.data[7:] - # A5[0]: waveform begins after the 21-byte STRT record and 6-byte preamble. - # Layout: STRT(21B) + null-pad(2B) + 0xFF sentinel(4B) = 27 bytes total. + # A5[0]: probe response. Two cases: + # - Event 1 (start_offset_in_strt == 0x0000): the bytes after STRT + # are the device's *pre-event reserved area* (flash 0x0046 to + # 0x0600), NOT samples. We must skip them; samples begin at + # the first dedicated chunk frame at counter=0x0600. + # - Event N (continuation, start_offset != 0x0000): the bytes after + # the STRT record ARE the first slice of real samples for the + # event (BW's chunk loop addresses the probe as a sample chunk). if fi == 0: sp = w.find(b"STRT") if sp < 0: continue + if is_event_1: + # No usable samples in the probe — pre-event reserved bytes. + continue + # Layout: STRT(21B) + null-pad(2B) + 0xFF sentinel(4B) = 27 bytes total. wave = w[sp + 27 :] - # Frame 7 carries event-time metadata strings ("Project:", "Client:", …) - # and no waveform ADC data. - elif fi == 7: + # Skip the dedicated metadata pages (event 1 only): page_addr 0x1002 / 0x1004. + elif page_addr in _METADATA_PAGES: + log.debug( + "_decode_a5_waveform: skipping metadata page fi=%d page_addr=%s", + fi, page_addr.hex(), + ) continue - # Terminator frames have page_key=0x0000 and are excluded upstream - # (read_bulk_waveform_stream returns early on page_key==0). - # No hardcoded frame-index skip here — all non-metadata frames are data. + # Sample chunk (or TERM): strip the 8-byte per-frame header. else: - # Strip the 8-byte per-frame header (ctr + 6 zero bytes) if len(w) < 8: continue wave = w[8:] @@ -1638,10 +1685,8 @@ def _decode_a5_waveform( total_bytes = global_offset n_sets = total_bytes // 8 log.debug( - "_decode_a5_waveform: %d chunks, %dB total → %d complete sample-sets " - "(%d of %d expected; %.0f%%)", - len(chunks), total_bytes, n_sets, n_sets, total_samples, - 100.0 * n_sets / total_samples if total_samples else 0, + "_decode_a5_waveform: %d chunks, %dB total → %d complete sample-sets", + len(chunks), total_bytes, n_sets, ) if n_sets == 0: @@ -1699,7 +1744,7 @@ def _decode_a5_waveform( "Tran": tran, "Vert": vert, "Long": long_, - "Mic": mic, + "MicL": mic, } diff --git a/sfm/server.py b/sfm/server.py index d7073ca..1f9988c 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -690,7 +690,7 @@ def device_event_waveform( if the device is not storing all frames yet, or the capture was partial) - **sample_rate**: samples per second (from compliance config) - **channels**: dict of channel name → list of signed int16 ADC counts - (keys: "Tran", "Vert", "Long", "Mic") + (keys: "Tran", "Vert", "Long", "MicL") **Caching**: full waveforms are cached permanently after the first download — they are immutable once recorded on the device. Subsequent requests for the