""" sfm/server.py — Seismograph Field Module REST API Wraps the minimateplus library in a small FastAPI service. Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100). Default port: 8200 Endpoints --------- GET /health Service heartbeat — no device I/O GET /device/info POLL + serial number + full config read GET /device/events Download all stored events (headers + peak values) POST /device/connect Explicit connect/identify (same as /device/info) GET /device/event/{idx} Single event by index (header + waveform record) Transport query params (supply one set): Serial (direct RS-232 cable): port — serial port name (e.g. COM5, /dev/ttyUSB0) baud — baud rate (default 38400) TCP (modem / ACH Auto Call Home): host — IP address or hostname of the modem or ACH relay tcp_port — TCP port number (default 12345, Blastware default) Each call opens the connection, does its work, then closes it. (Stateless / reconnect-per-call, matching Blastware's observed behaviour.) Run with: python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload or: python sfm/server.py """ from __future__ import annotations import datetime import logging import sys import tempfile import threading import time from pathlib import Path from typing import Optional # FastAPI / Pydantic try: from fastapi import Body, FastAPI, File, HTTPException, Query, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from pydantic import BaseModel import uvicorn except ImportError: print( "fastapi and uvicorn are required for the SFM server.\n" "Install them with: pip install fastapi uvicorn", file=sys.stderr, ) sys.exit(1) from minimateplus import MiniMateClient from minimateplus.protocol import ProtocolError from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT from minimateplus.blastware_file import write_blastware_file, blastware_filename from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform from sfm import event_hdf5 from sfm.cache import SFMCache, get_cache from sfm.database import SeismoDb from sfm.live_cache import LiveCache as _LiveCache from sfm.waveform_store import WaveformStore logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("sfm.server") # ── FastAPI app ──────────────────────────────────────────────────────────────── app = FastAPI( title="Seismograph Field Module (SFM)", description=( "REST API for Instantel MiniMate Plus seismographs.\n" "Implements the minimateplus RS-232 protocol library.\n" "Proxied by terra-view at /api/sfm/*." ), version="0.1.0", ) # Allow requests from the waveform viewer opened as a local file (file://) # and from any dev server or terra-view proxy. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST"], allow_headers=["*"], ) # ── DB ──────────────────────────────────────────────────────────────────────── # Shared SeismoDb instance. Path can be overridden by --db-path at startup, # or defaults to bridges/captures/seismo_relay.db relative to the repo root. _DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db" _db: Optional[SeismoDb] = None _store: Optional[WaveformStore] = None def _get_db() -> SeismoDb: global _db if _db is None: _db = SeismoDb(_DEFAULT_DB_PATH) return _db def _get_store() -> WaveformStore: """ Persistent event-file + A5-sidecar store, rooted at /waveforms/. Mirrors the layout used by bridges/ach_server.py so files saved by ACH ingestion and by live SFM downloads share one canonical location. """ global _store if _store is None: _store = WaveformStore(_get_db().db_path.parent / "waveforms") return _store # ── Live device cache ───────────────────────────────────────────────────────── # In-memory cache for live device data. Avoids re-dialing the device on every # request when the data hasn't changed. # # Keyed by conn_key ("tcp:host:port" or "serial:port:baud"). # Does NOT persist across server restarts — this is purely an in-process cache # to reduce TCP round-trips and cellular data usage. # # Invalidation rules: # device_info — cached until POST /device/config marks it dirty # events — cached by (conn_key, device_event_count); re-fetched when # a quick count_events() probe shows new events on the device # monitor_status — 30-second TTL (changes frequently during monitoring) # waveforms — permanent (immutable once recorded; indexed by conn_key+idx) # # All endpoints accept ?force=true to bypass the cache and re-read from device. _live_cache = _LiveCache() # ── Serialisers ──────────────────────────────────────────────────────────────── # Plain dict helpers — avoids a Pydantic dependency in the library layer. def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]: if ts is None: return None return { "year": ts.year, "month": ts.month, "day": ts.day, "hour": ts.hour, "minute": ts.minute, "second": ts.second, "clock_set": ts.clock_set, "display": str(ts), } def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]: if pv is None: return None return { "tran_in_s": pv.tran, "vert_in_s": pv.vert, "long_in_s": pv.long, "micl_psi": pv.micl, "peak_vector_sum": pv.peak_vector_sum, } def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]: if pi is None: return None return { "setup_name": pi.setup_name, "project": pi.project, "client": pi.client, "operator": pi.operator, "sensor_location": pi.sensor_location, "notes": pi.notes, } def _serialise_compliance_config(cc: Optional["ComplianceConfig"]) -> Optional[dict]: if cc is None: return None return { "recording_mode": cc.recording_mode, # 0x00=Single Shot, 0x01=Continuous, 0x03=Histogram, 0x04=Histogram+Continuous "sample_rate": cc.sample_rate, "histogram_interval_sec": cc.histogram_interval_sec, # seconds; None if not Histogram mode "record_time": cc.record_time, "trigger_level_geo": cc.trigger_level_geo, "alarm_level_geo": cc.alarm_level_geo, "geo_adc_scale": cc.geo_adc_scale, # hw scale factor (in/s)/V — informational only, do not write "geo_range": cc.geo_range, # CONFIRMED 2026-04-20: 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s "setup_name": cc.setup_name, "project": cc.project, "client": cc.client, "operator": cc.operator, "sensor_location": cc.sensor_location, "notes": cc.notes, } def _serialise_call_home_config(ch: Optional["CallHomeConfig"]) -> Optional[dict]: if ch is None: return None return { "auto_call_home_enabled": ch.auto_call_home_enabled, "dial_string": ch.dial_string, "after_event_recorded": ch.after_event_recorded, "at_specified_times": ch.at_specified_times, "time1_enabled": ch.time1_enabled, "time1_hour": ch.time1_hour, "time1_min": ch.time1_min, "time2_enabled": ch.time2_enabled, "time2_hour": ch.time2_hour, "time2_min": ch.time2_min, "num_retries": ch.num_retries, "time_between_retries_sec": ch.time_between_retries_sec, "wait_for_connection_sec": ch.wait_for_connection_sec, "warm_up_time_sec": ch.warm_up_time_sec, } def _serialise_device_info(info: DeviceInfo) -> dict: return { "serial": info.serial, "firmware_version": info.firmware_version, "firmware_minor": info.firmware_minor, "dsp_version": info.dsp_version, "manufacturer": info.manufacturer, "model": info.model, "event_count_sub08": info.event_count, # unreliable — SUB 08 always returns 1 "compliance_config": _serialise_compliance_config(info.compliance_config), } def _serialise_event(ev: Event, debug: bool = False) -> dict: d: dict = { "index": ev.index, "timestamp": _serialise_timestamp(ev.timestamp), "sample_rate": ev.sample_rate, "record_type": ev.record_type, "peak_values": _serialise_peak_values(ev.peak_values), "project_info": _serialise_project_info(ev.project_info), } if debug: raw = getattr(ev, "_raw_record", None) d["raw_record_hex"] = raw.hex() if raw else None d["raw_record_len"] = len(raw) if raw else 0 return d # ── Transport factory ───────────────────────────────────────────────────────── def _build_client( port: Optional[str], baud: int, host: Optional[str], tcp_port: int, timeout: float = 30.0, ) -> MiniMateClient: """ Return a MiniMateClient configured for either serial or TCP transport. TCP takes priority if *host* is supplied; otherwise *port* (serial) is used. Raises HTTPException(422) if neither is provided. Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over cellular and each individual recv must complete within the timeout window. """ if host: transport = TcpTransport(host, port=tcp_port) log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout) return MiniMateClient(transport=transport, timeout=timeout) elif port: log.debug("Serial transport: %s baud=%d", port, baud) return MiniMateClient(port, baud) else: raise HTTPException( status_code=422, detail=( "Specify either 'port' (serial, e.g. ?port=COM5) " "or 'host' (TCP, e.g. ?host=192.168.1.50&tcp_port=12345)" ), ) def _is_tcp(host: Optional[str]) -> bool: return bool(host) def _run_with_retry(fn, *, is_tcp: bool): """ Call fn() and, for TCP connections only, retry once on ProtocolError. Rationale: when a MiniMate Plus is cold (just had its serial lines asserted by the modem or a local bridge), it takes 5-10 seconds to boot before it will respond to POLL_PROBE. The first request may time out during that boot window; a single automatic retry is enough to recover once the unit is up. Serial connections are NOT retried — a timeout there usually means a real problem (wrong port, wrong baud, cable unplugged). """ try: return fn() except ProtocolError as exc: if not is_tcp: raise log.info("TCP poll timed out (unit may have been cold) — retrying once") return fn() # let any second failure propagate normally # ── Helpers ──────────────────────────────────────────────────────────────────── def _backfill_events(events: list, info: "DeviceInfo") -> None: """ Fill in sample_rate and project_info fields that the per-event waveform record doesn't carry — sourced from the device's compliance config. Extracted from device_events() so it can be called from both the full download path and the partial (new-events-only) path. """ if info.compliance_config and info.compliance_config.sample_rate: for ev in events: if ev.sample_rate is None: ev.sample_rate = info.compliance_config.sample_rate if info.compliance_config: cc = info.compliance_config for ev in events: if ev.project_info is None: ev.project_info = ProjectInfo() pi = ev.project_info if pi.client is None: pi.client = cc.client if pi.operator is None: pi.operator = cc.operator if pi.sensor_location is None: pi.sensor_location = cc.sensor_location if pi.notes is None: pi.notes = cc.notes # ── Endpoints ────────────────────────────────────────────────────────────────── @app.get("/health") def health() -> dict: """Service heartbeat. No device I/O.""" return {"status": "ok", "service": "sfm", "version": "0.1.0"} @app.get("/", response_class=FileResponse) def webapp(): """Serve the SFM web app.""" return str(Path(__file__).parent / "sfm_webapp.html") @app.get("/waveform", response_class=FileResponse) def waveform_viewer(): """Serve the standalone waveform viewer.""" return str(Path(__file__).parent / "waveform_viewer.html") @app.get("/device/info") def device_info( port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"), baud: int = Query(38400, description="Serial baud rate (default 38400)"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Connect to the device, perform the POLL startup handshake, and return identity information (serial number, firmware version, model). Supply either *port* (serial) or *host* (TCP/modem). Equivalent to POST /device/connect — provided as GET for convenience. **Caching**: device identity and compliance config are cached after the first successful read (they rarely change). Pass *force=true* to bypass the cache and re-read directly from the device (e.g. after a config push). The cache is also automatically invalidated after POST /device/config. """ log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force) cache = get_cache() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) if not force: cached = cache.get_device_info(conn_key) if cached is not None: log.info("device info cache hit for %s", conn_key) cached["_cached"] = True return cached try: def _do(): with _build_client(port, baud, host, tcp_port) as client: info = client.connect() # SUB 08 event_count is unreliable (always returns 1 regardless of # actual storage). Count via 1E/1F chain instead. info.event_count = client.count_events() return info info = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc result = _serialise_device_info(info) cache.set_device_info(conn_key, result) return result @app.post("/device/connect") def device_connect( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Connect to the device and return identity. POST variant for terra-view compatibility with the SLMM proxy pattern. """ return device_info(port=port, baud=baud, host=host, tcp_port=tcp_port) @app.get("/device/events") def device_events( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), debug: bool = Query(False, description="Include raw record hex for field-layout inspection"), force: bool = Query(False, description="Bypass cache and re-download all events from device"), ) -> dict: """ Connect to the device, read the event index, and download all stored events (event headers + full waveform records with peak values). Supply either *port* (serial) or *host* (TCP/modem). **Caching:** a quick count_events() probe (~2s) is performed first. If the device's event count matches the cached count, the cached response is returned immediately without a full download. Pass ?force=true to skip this and always re-download. Pass debug=true to include raw_record_hex in each event — useful for verifying field offsets against the protocol reference. This does NOT download raw ADC waveform samples — those are large and fetched separately via GET /device/event/{idx}/waveform. **Caching**: event headers are cached after the first download. On subsequent calls, the device is contacted only to check the event count (fast: ~2s). If the count matches the cache, all events are returned from cache instantly. If new events exist on the device, only the new ones are downloaded and merged. Pass *force=true* to bypass the cache entirely and re-download everything. """ log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force) cache = get_cache() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) # ── Smart cache path (skip when debug=True or force=True) ──────────────── # debug mode uses raw_record_hex which isn't stored in the cache, so we # must always go to the device when debug is requested. if not force and not debug: cached_events = cache.get_all_events(conn_key) cached_count = len(cached_events) if cached_events else 0 if cached_count > 0: # Quick device contact: just count events via the fast 1E/1F chain. # This takes ~2s instead of the full event download (~10-30s). try: def _count(): with _build_client(port, baud, host, tcp_port) as client: client.connect() return client.count_events() device_count = _run_with_retry(_count, is_tcp=_is_tcp(host)) except HTTPException: raise except (ProtocolError, OSError, Exception) as exc: # If we can't reach the device at all, serve stale cache rather # than returning an error — field units go offline regularly. log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key) cached_info = cache.get_device_info(conn_key) or {} return { "device": cached_info, "event_count": cached_count, "events": cached_events, "_cached": True, "_stale": True, } if device_count == cached_count: # Nothing new — return cache immediately, no event download needed. log.info( "event cache hit for %s: %d events, device count matches", conn_key, cached_count, ) cached_info = cache.get_device_info(conn_key) or {} return { "device": cached_info, "event_count": cached_count, "events": cached_events, "_cached": True, } if device_count > cached_count: # New events on the device — download all events but only store/return # the new ones. Events are append-only; indices 0..(cached_count-1) # are already in the cache and don't need to be re-downloaded logically, # but the protocol requires iterating from event 0 to reach later ones. # The device download time is dominated by the number of events requested, # so we stop at the last known event index to avoid re-downloading everything. log.info( "new events on device %s: have %d, device has %d — fetching all up to %d", conn_key, cached_count, device_count, device_count - 1, ) try: def _fetch_new(): with _build_client(port, baud, host, tcp_port) as client: info = client.connect() all_evs = client.get_events(stop_after_index=device_count - 1) return info, all_evs info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc _backfill_events(all_events, info) # Only the new events (indices >= cached_count) are truly new. new_events = [ev for ev in all_events if ev.index >= cached_count] new_serialised = [_serialise_event(ev) for ev in new_events] cache.set_events(conn_key, new_serialised) cache.set_device_info(conn_key, _serialise_device_info(info)) merged_events = cache.get_all_events(conn_key) return { "device": _serialise_device_info(info), "event_count": len(merged_events), "events": merged_events, "_cached": True, "_new_events": len(new_events), } # ── Full download path (first call, force=True, or debug=True) ─────────── try: def _do(): with _build_client(port, baud, host, tcp_port) as client: return client.connect(), client.get_events(debug=debug) info, events = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc _backfill_events(events, info) serialised = [_serialise_event(ev, debug=debug) for ev in events] if not debug: # Only cache when not in debug mode (debug adds raw_record_hex which # we don't want polluting the normal cache entries). cache.set_events(conn_key, serialised) cache.set_device_info(conn_key, _serialise_device_info(info)) serialised_info = _serialise_device_info(info) serialised_events = [_serialise_event(ev, debug=debug) for ev in events] # Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache) if not debug: _live_cache.set_device_info(conn_key, serialised_info) _live_cache.set_events(conn_key, len(events), serialised_events) return { "device": serialised_info, "event_count": len(events), "events": serialised, } @app.get("/device/event/{index}") def device_event( index: int, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), force: bool = Query(False, description="Bypass cache and re-download from device"), ) -> dict: """ Download a single event by index (0-based). Supply either *port* (serial) or *host* (TCP/modem). Performs: POLL startup → event index → event header → waveform record. **Caching**: if this event was already downloaded (e.g. via GET /device/events), it is returned instantly from cache with no device contact. """ log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force) cache = get_cache() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) if not force: cached = cache.get_event(conn_key, index) if cached is not None: log.info("event cache hit for %s index %d", conn_key, index) cached["_cached"] = True return cached try: def _do(): with _build_client(port, baud, host, tcp_port) as client: info = client.connect() events = client.get_events(stop_after_index=index) return info, events info, events = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc matching = [ev for ev in events if ev.index == index] if not matching: raise HTTPException( status_code=404, detail=f"Event index {index} not found on device", ) _backfill_events(matching, info) result = _serialise_event(matching[0]) # Store all downloaded events (we paid for them anyway — indices 0..index) all_serialised = [_serialise_event(ev) for ev in events] cache.set_events(conn_key, all_serialised) return result @app.get("/device/event/{index}/waveform") def device_event_waveform( index: int, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), force: bool = Query(False, description="Bypass cache and re-download from device"), ) -> dict: """ Download the full raw ADC waveform for a single event (0-based index). Supply either *port* (serial) or *host* (TCP/modem). Performs: POLL startup → get_events() (to locate the 4-byte waveform key) → download_waveform() (full SUB 5A stream, stop_after_metadata=False). Response includes: - **total_samples**: expected sample-sets from the STRT record - **pretrig_samples**: pre-trigger sample count - **rectime_seconds**: record duration - **samples_decoded**: actual sample-sets decoded (may be less than total_samples if the device is not storing all frames yet, or the capture was partial) - **sample_rate**: samples per second (from compliance config) - **channels**: dict of channel name → list of signed int16 ADC counts (keys: "Tran", "Vert", "Long", "MicL") **Caching**: full waveforms are cached permanently after the first download — they are immutable once recorded on the device. Subsequent requests for the same event return instantly from cache without any device contact. Pass *force=true* to force a fresh download (rarely needed). """ log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force) cache = get_cache() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) if not force: cached = cache.get_waveform(conn_key, index) if cached is not None: log.info("waveform cache hit for %s event %d", conn_key, index) cached["_cached"] = True return cached try: def _do(): with _build_client(port, baud, host, tcp_port, timeout=120.0) as client: info = client.connect() # stop_after_index avoids downloading events beyond the one requested. events = client.get_events(full_waveform=True, stop_after_index=index) matching = [ev for ev in events if ev.index == index] return matching[0] if matching else None, info ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc if ev is None: raise HTTPException( status_code=404, detail=f"Event index {index} not found on device", ) # Backfill from compliance_config: sample_rate, record_time, and # derived total_samples. These are user-set authoritative values; the # corresponding STRT-derived guesses in `_decode_a5_waveform` can be # off (e.g. rectime used to read the 0x46 record-type marker = 70s). cc = info.compliance_config if cc: if ev.sample_rate is None and cc.sample_rate: ev.sample_rate = cc.sample_rate if cc.record_time: ev.rectime_seconds = cc.record_time if ev.sample_rate and ev.rectime_seconds: derived = int(round(ev.sample_rate * ev.rectime_seconds)) if (ev.total_samples is None or ev.total_samples > derived * 2 or ev.total_samples < derived // 4): ev.total_samples = derived geo_range = getattr(cc, "geo_range", None) if cc else None # Build the plot.v1 JSON: samples in physical units (in/s for geo, psi # for mic), explicit time axis, peak markers — the shape clients should # consume directly without doing any ADC scaling. serial = getattr(info, "serial", None) or "" result = event_hdf5.event_to_plot_json( ev, serial=serial, geo_range=geo_range or "normal", index=index, ) cache.set_waveform(conn_key, index, result) return result @app.get("/device/event/{index}/blastware_file") def device_event_blastware_file( index: int, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), force: bool = Query(False, description="Bypass any cached/dedup'd state and re-download from device"), ) -> FileResponse: """ Download the waveform for a single event (0-based index) and return it as a Blastware-compatible binary file with a correct Blastware filename. Supply either *port* (serial) or *host* (TCP/modem). The file is written to the OS temp directory and streamed back as a binary download. Blastware can open it directly — filename encodes serial + timestamp. Filename format: 0 - prefix letter = chr(ord('B') + floor(serial_numeric / 1000)) - stem + AB = second-resolution timestamp since 1985-01-01 local - W / H = Full Waveform / Full Histogram (defaults to W for triggered events; histogram requires recording_mode to be populated from compliance config) Performs: POLL startup → get_events(full_waveform=True, stop_after_index=index) → write_blastware_file() → FileResponse + persistent store + DB upsert. """ log.info( "GET /device/event/%d/blastware_file port=%s host=%s force=%s", index, port, host, force, ) # `force` always re-downloads from the device. This endpoint already # never short-circuits via cache, so `force` is reserved for parity with # the other live endpoints. try: def _do(): with _build_client(port, baud, host, tcp_port, timeout=120.0) as client: info = client.connect() # full_waveform=True pulls the complete 5A stream so the # client populates STRT-derived fields (total_samples, # pretrig_samples, rectime_seconds) AND raw_samples on the # Event. Required for the .h5 + .sfm.json sidecar to be # filled in correctly — without it, those land as nulls. events = client.get_events( full_waveform=True, stop_after_index=index, ) matching = [ev for ev in events if ev.index == index] return matching[0] if matching else None, info ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: log.error("blastware_file: protocol error: %s", exc, exc_info=True) raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: log.error("blastware_file: connection error: %s", exc, exc_info=True) raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: log.error("blastware_file: unexpected error: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc if ev is None: raise HTTPException( status_code=404, detail=f"Event index {index} not found on device", ) a5_frames = getattr(ev, "_a5_frames", None) if not a5_frames: raise HTTPException( status_code=502, detail=f"No waveform data received for event index {index} — 5A download failed", ) # Determine serial number from device info serial = getattr(info, "serial", None) or "UNKNOWN" # Build filename using the same algorithm Blastware uses filename = blastware_filename(ev, serial) # Write to OS temp dir (cross-platform: /tmp on Linux/macOS, # %TEMP% on Windows) so FastAPI can stream it back via FileResponse. out_path = Path(tempfile.gettempdir()) / filename # Delete any stale file at this path before writing. On Windows we have # observed the new (smaller) file getting trailing zero-bytes from the # previous (larger) file when filesystem semantics around open(...,"wb") # don't truncate cleanly (e.g. through a synced folder). Explicit unlink # eliminates that ambiguity. try: out_path.unlink() except FileNotFoundError: pass write_blastware_file(ev, a5_frames, out_path) log.info( "blastware_file: wrote %s (%d A5 frames, serial=%s)", out_path, len(a5_frames), serial, ) # Promote to canonical persistent store + DB row so this event is # queryable via /db/events afterwards (matches the ACH ingestion path). if serial != "UNKNOWN" and ev._waveform_key is not None: try: cc = info.compliance_config # Backfill authoritative compliance-config values onto the # Event before persisting. These supersede whatever # _decode_a5_waveform read from the STRT bytes (some of which # have ambiguous semantics — e.g. STRT[20] is rectime but # STRT[8:10] / STRT[16:18] are device-specific scratch fields # that aren't reliable sample/pretrig counts). if cc: if ev.sample_rate is None and cc.sample_rate: ev.sample_rate = cc.sample_rate if cc.record_time: # record_time from compliance is authoritative — the # user-set value the device followed when recording. ev.rectime_seconds = cc.record_time # Derive total_samples from sample_rate × rectime when # we can; the STRT-derived value can land at a buffer- # offset rather than a sample count. if ev.sample_rate and ev.rectime_seconds: derived = int(round(ev.sample_rate * ev.rectime_seconds)) if (ev.total_samples is None or ev.total_samples > derived * 2 or ev.total_samples < derived // 4): ev.total_samples = derived geo_range = getattr(cc, "geo_range", None) if cc else None rec = _get_store().save( ev, serial=serial, a5_frames=a5_frames, geo_range=geo_range if geo_range is not None else "normal", ) _get_db().insert_events( [ev], serial=serial, waveform_records={ev._waveform_key.hex(): rec}, ) log.info( "blastware_file: persisted to store (%s, %d bytes)", rec["filename"], rec["filesize"], ) except Exception as exc: log.warning( "blastware_file: persistent store save failed: %s " "— temp file still served", exc, ) return FileResponse( path=str(out_path), filename=filename, media_type="application/octet-stream", ) # ── Write endpoints ─────────────────────────────────────────────────────────── class DeviceConfigBody(BaseModel): """ Request body for POST /device/config. All fields are optional — only supplied (non-null) fields are written to the device. All other config bytes are round-tripped verbatim. Recording parameters -------------------- recording_mode : Recording mode enum. Values: 0=Single Shot, 1=Continuous, 3=Histogram, 4=Histogram+Continuous. sample_rate : Samples per second. Valid values: 1024, 2048, 4096. record_time : Record duration in seconds (e.g. 1.0, 2.0, 3.0). Trigger / alarm thresholds and range (geo channels) ---------------------------------------------------- trigger_level_geo : Trigger threshold in in/s (e.g. 0.5). alarm_level_geo : Alarm threshold in in/s (e.g. 1.0). geo_range : Geophone range/sensitivity. 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s. Project / operator strings (max 41 ASCII characters each) ---------------------------- project : Project description. client_name : Client / company name. operator : Operator / technician name. seis_loc : Sensor location description. notes : Extended notes. """ # Recording parameters recording_mode: Optional[int] = None sample_rate: Optional[int] = None record_time: Optional[float] = None histogram_interval_sec: Optional[int] = None # seconds: 2, 5, 15, 60, 300, 900 (mode-gated) # Threshold parameters / geo range trigger_level_geo: Optional[float] = None alarm_level_geo: Optional[float] = None geo_range: Optional[int] = None # 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s # Project / operator strings project: Optional[str] = None client_name: Optional[str] = None operator: Optional[str] = None seis_loc: Optional[str] = None notes: Optional[str] = None @app.post("/device/config") def device_config( body: DeviceConfigBody, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Read the current device config, apply any supplied changes to the compliance block, and write the full config back. Only non-null fields in the JSON body are modified. All other config bytes are round-tripped verbatim from the device. Supply either *port* (serial) or *host* (TCP/modem). Example body (all fields optional — include only what you want to change): { "recording_mode": 1, "sample_rate": 1024, "record_time": 3.0, "trigger_level_geo": 0.5, "alarm_level_geo": 1.0, "project": "Bridge Inspection 2026", "client_name": "City of Portland", "operator": "Brian Harrison", "seis_loc": "South Abutment", "notes": "Pre-blast baseline" } Returns: {"status": "ok", "updated_fields": {...}} on success. Raises: 502 on protocol errors (timeout, bad ack, etc.). 422 if neither port nor host is provided. """ changed = body.model_dump(exclude_none=True) conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys())) try: def _do(): with _build_client(port, baud, host, tcp_port) as client: client.connect() client.apply_config( recording_mode=body.recording_mode, sample_rate=body.sample_rate, record_time=body.record_time, histogram_interval_sec=body.histogram_interval_sec, trigger_level_geo=body.trigger_level_geo, alarm_level_geo=body.alarm_level_geo, geo_range=body.geo_range, project=body.project, client_name=body.client_name, operator=body.operator, seis_loc=body.seis_loc, notes=body.notes, ) _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc # Config was written to the device — the cached compliance config is now stale. conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) get_cache().mark_config_dirty(conn_key) return { "status": "ok", "updated_fields": changed, } # Keep the old endpoint alive under its old URL for anything already calling it @app.post("/device/config/project") def device_config_project( body: DeviceConfigBody, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """Deprecated alias for POST /device/config — use that instead.""" return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port) # ── Monitoring endpoints ─────────────────────────────────────────────────────── @app.get("/device/monitor/status") def device_monitor_status( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Read monitoring status from the device. Uses poll() (POLL handshake only — no config/compliance reads) so the request completes in ~2 seconds instead of ~15. The full connect() was causing false "idle" readings because the compliance+event sequence was interacting with the device state before the 0x1C read. Returns is_monitoring bool, battery voltage, and memory usage (total + free bytes). Battery and memory are only present when the unit is idle. **Caching**: status is cached for 30 seconds to reduce cellular polling overhead. Pass *force=true* to bypass the cache for an immediate fresh read. """ cache = get_cache() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) if not force: cached = cache.get_monitor_status(conn_key) if cached is not None: log.debug("monitor status cache hit for %s", conn_key) cached["_cached"] = True return cached with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() except Exception as exc: log.warning("monitor status poll retry: %s", exc) client.poll() status = client.get_monitor_status() result: dict = {"is_monitoring": status.is_monitoring} if status.battery_v is not None: result["battery_v"] = round(status.battery_v, 2) if status.memory_total is not None: result["memory_total_bytes"] = status.memory_total result["memory_total_kb"] = round(status.memory_total / 1024, 1) if status.memory_free is not None: result["memory_free_bytes"] = status.memory_free result["memory_free_kb"] = round(status.memory_free / 1024, 1) cache.set_monitor_status(conn_key, result) return result @app.post("/device/monitor/start") def device_monitor_start( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Command the device to start monitoring (recording triggered events). Sends SUB 0x96 and waits for ack SUB 0x69. """ conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() except Exception as exc: log.warning("start monitoring poll retry: %s", exc) client.poll() client.start_monitoring() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) get_cache().invalidate_monitor_status(conn_key) return {"status": "started"} @app.post("/device/monitor/stop") def device_monitor_stop( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Command the device to stop monitoring. Sends SUB 0x97 and waits for ack SUB 0x68. """ conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() except Exception as exc: log.warning("stop monitoring poll retry: %s", exc) client.poll() client.stop_monitoring() conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) get_cache().invalidate_monitor_status(conn_key) return {"status": "stopped"} # ── Call home config endpoints ─────────────────────────────────────────────── @app.get("/device/call_home") def device_call_home_get( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Read the Auto Call Home (ACH) configuration from the device. Sends SUB 0x2C (two-step read) and returns the decoded call home config. Confirmed from 4-20-26 call home settings captures (BE11529). Returns: { "auto_call_home_enabled": true/false, "dial_string": "RADIO RING", "after_event_recorded": true/false, "at_specified_times": true/false, "time1_enabled": true/false, "time1_hour": 19, "time1_min": 55, "time2_enabled": false, "time2_hour": 0, "time2_min": 0, "num_retries": 3, "time_between_retries_sec": 15, "wait_for_connection_sec": 60, "warm_up_time_sec": 60 } """ try: def _do(): with _build_client(port, baud, host, tcp_port) as client: client.poll() return client.get_call_home_config() ch_config = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc return _serialise_call_home_config(ch_config) or {} class CallHomeConfigBody(BaseModel): """ Request body for POST /device/call_home. All fields are optional — only supplied (non-null) fields are modified. All other call home config bytes are round-tripped verbatim from the device. Confirmed writable fields (4-20-26 captures): auto_call_home_enabled : bool — master enable for auto call home after_event_recorded : bool — call home after each triggered event at_specified_times : bool — enable time-based scheduled calls time1_enabled : bool — enable time slot 1 time1_hour : int — hour for slot 1 (0-23; avoid 3 — DLE escape limitation) time1_min : int — minute for slot 1 (0-59; avoid 3) time2_enabled : bool — enable time slot 2 time2_hour : int — hour for slot 2 (0-23; avoid 3) time2_min : int — minute for slot 2 (0-59; avoid 3) Read-only fields (not writable via this endpoint): dial_string, num_retries, time_between_retries_sec, wait_for_connection_sec, warm_up_time_sec """ auto_call_home_enabled: Optional[bool] = None after_event_recorded: Optional[bool] = None at_specified_times: Optional[bool] = None time1_enabled: Optional[bool] = None time1_hour: Optional[int] = None time1_min: Optional[int] = None time2_enabled: Optional[bool] = None time2_hour: Optional[int] = None time2_min: Optional[int] = None @app.post("/device/call_home") def device_call_home_set( body: CallHomeConfigBody, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Read the current call home config, apply supplied changes, and write back. Only non-null fields are modified. All other bytes round-trip verbatim. Write sequence (confirmed from 4-20-26 call home settings captures): SUB 0x2C (read 2-step) → 125-byte raw payload patch fields SUB 0x7E (write 127-byte payload) → ack 0x81 SUB 0x7F (confirm) → ack 0x80 Example body: { "auto_call_home_enabled": true, "after_event_recorded": true, "time1_enabled": true, "time1_hour": 20, "time1_min": 0 } """ changed = body.model_dump(exclude_none=True) log.info("POST /device/call_home port=%s host=%s fields=%s", port, host, list(changed.keys())) try: def _do(): with _build_client(port, baud, host, tcp_port) as client: client.poll() client.set_call_home_config( auto_call_home_enabled=body.auto_call_home_enabled, after_event_recorded=body.after_event_recorded, at_specified_times=body.at_specified_times, time1_enabled=body.time1_enabled, time1_hour=body.time1_hour, time1_min=body.time1_min, time2_enabled=body.time2_enabled, time2_hour=body.time2_hour, time2_min=body.time2_min, ) _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc return {"status": "ok", "updated_fields": changed} # ── Cache management endpoints ──────────────────────────────────────────────── @app.get("/cache/stats") def cache_stats() -> dict: """ Return row counts for all cache tables. Useful for debugging and verifying that caching is working as expected. """ return get_cache().stats() @app.delete("/cache/device") def cache_clear_device( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), ) -> dict: """ Clear all cached data for a specific device (identified by its connection address). Clears: device info, all event headers, all waveforms, monitor status. The next request to any endpoint for this device will re-fetch from the device. Supply either *port* (serial) or *host* (TCP/modem) to identify the device. """ conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) counts = get_cache().clear_device(conn_key) return {"status": "cleared", "conn_key": conn_key, "deleted": counts} # ── DB read endpoints ───────────────────────────────────────────────────────── # # These endpoints expose the seismo-relay SQLite DB written by ach_server.py. # All queries are read-only. Terra-view calls these to build project event # views, unit history panels, and (eventually) vibration summary reports. @app.get("/db/units") def db_units() -> list[dict]: """ Return one row per known serial with summary stats: last_seen, total_events, total_monitor_entries, total_sessions. """ return _get_db().query_units() @app.get("/db/events") def db_events( serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"), from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"), limit: int = Query(500, description="Max rows to return (default 500)"), offset: int = Query(0, description="Pagination offset"), ) -> dict: """ Query triggered events from the DB. Returns events newest-first. All filter params are optional. Example: GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100 """ from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None rows = _get_db().query_events( serial=serial, from_dt=from_parsed, to_dt=to_parsed, false_trigger=false_trigger, limit=limit, offset=offset, ) return {"count": len(rows), "events": rows} @app.patch("/db/events/{event_id}/false_trigger") def db_set_false_trigger( event_id: str, value: bool = Query(..., description="True to flag as false trigger, False to clear"), ) -> dict: """ Set or clear the false_trigger flag on a single event. Used by the terra-view event review UI. Returns 404 if the event_id is not found. """ found = _get_db().set_false_trigger(event_id, value) if not found: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") return {"status": "ok", "event_id": event_id, "false_trigger": value} # ── /db/events/{id} — waveform file accessors ───────────────────────────────── # # These endpoints serve files from the persistent WaveformStore, so a Blastware # file or its decoded JSON for a previously-ingested ACH event can be fetched # without re-dialing the device. @app.get("/db/events/{event_id}/blastware_file") def db_event_blastware_file(event_id: str) -> FileResponse: """ Return the Blastware-format event file for a previously-ingested event. Filename extension is per-event (timestamp-encoded `AB0T` for ACH downloads, 3-char `AB0` for direct downloads). 404 if the event is unknown or has no event file in the store (events ingested before the store was wired will show this — re-download via the live endpoint to populate). """ row = _get_db().get_event(event_id) if row is None: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") serial = row.get("serial") filename = row.get("blastware_filename") if not serial or not filename: raise HTTPException( status_code=404, detail=( f"Event {event_id} has no Blastware file in the store. " "Re-download via the live endpoint to populate." ), ) bw_path = _get_store().open_blastware(serial, filename) if bw_path is None: raise HTTPException( status_code=410, detail=f"Stored file missing on disk: {filename}", ) return FileResponse( path=str(bw_path), filename=filename, media_type="application/octet-stream", ) @app.get("/db/events/{event_id}/waveform.json") def db_event_waveform_json(event_id: str) -> dict: """ Return the plot-ready JSON (`sfm.plot.v1`) for a stored event. Resolution order (cheapest first): 1. If `.h5` exists, serve it via `plot_json_from_hdf5`. Samples are already in physical units; no decode work needed. 2. Else if `.a5.pkl` exists, replay the A5 decoders to rebuild an Event and serialise via `event_to_plot_json`. 3. Else 404 — the event has no waveform data on disk. The shape is identical regardless of source, so clients (the SFM webapp, Terra-View, etc.) consume the same `sfm.plot.v1` payload. """ row = _get_db().get_event(event_id) if row is None: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") serial = row.get("serial") filename = row.get("blastware_filename") if not serial or not filename: raise HTTPException( status_code=404, detail=f"Event {event_id} has no event file in the store", ) store = _get_store() # Path 1: HDF5 (canonical clean format). h5_path = store.hdf5_path_for(serial, filename) if h5_path.exists(): try: return event_hdf5.plot_json_from_hdf5(h5_path, event_id=event_id) except Exception as exc: log.warning("HDF5 read failed (%s); falling back to A5 path", exc) # Path 2: A5 pickle replay. a5_frames = store.load_a5(serial, filename) if not a5_frames: raise HTTPException( status_code=404, detail=( f"Event {event_id} has no waveform data on disk " "(no .h5 and no .a5.pkl). Run the backfill script or " "re-download via the live endpoint to populate." ), ) ev = Event(index=-1) try: _decode_a5_metadata_into(a5_frames, ev) except Exception as exc: log.warning("db_event_waveform_json: metadata decode failed: %s", exc) try: _decode_a5_waveform(a5_frames, ev) except Exception as exc: log.error("db_event_waveform_json: waveform decode failed: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail=f"Waveform decode failed: {exc}") from exc # Carry over fields from the DB row when the A5 replay didn't fill them. if ev.sample_rate is None and row.get("sample_rate"): ev.sample_rate = row.get("sample_rate") return event_hdf5.event_to_plot_json( ev, serial=serial, geo_range="normal", event_id=event_id, ) # ── /db/events/{id}/sidecar — modern .sfm.json review/metadata accessors ────── class SidecarPatchBody(BaseModel): """Body for PATCH /db/events/{id}/sidecar. JSON-merge-patch semantics: only the keys you include get updated. `review` is the editable block for monthly-summary workflows (false_trigger flag, reviewer notes, etc.); `extensions` is the forward-compat namespace for vendor / future fields. """ review: Optional[dict] = None extensions: Optional[dict] = None @app.get("/db/events/{event_id}/sidecar") def db_event_sidecar(event_id: str) -> dict: """ Return the .sfm.json sidecar for a stored event. 404 if the event is unknown or has no sidecar in the store (events ingested before the sidecar feature landed will show this until backfilled). """ row = _get_db().get_event(event_id) if row is None: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") serial = row.get("serial") filename = row.get("blastware_filename") if not serial or not filename: raise HTTPException( status_code=404, detail=f"Event {event_id} has no event file in the store", ) sidecar = _get_store().load_sidecar(serial, filename) if sidecar is None: raise HTTPException( status_code=404, detail=( f"No .sfm.json sidecar on disk for {filename}. " "Run scripts/backfill_sidecars.py to generate one." ), ) return sidecar @app.patch("/db/events/{event_id}/sidecar") def db_event_sidecar_patch(event_id: str, body: SidecarPatchBody) -> dict: """ JSON-merge-patch the sidecar's `review` and/or `extensions` blocks. The sidecar JSON is the source of truth for review state. When `review.false_trigger` is updated, the SQL `events.false_trigger` column is kept in sync as a derived index for fast filtering. Returns the new full sidecar. 404 if the event or sidecar is missing. """ row = _get_db().get_event(event_id) if row is None: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") serial = row.get("serial") filename = row.get("blastware_filename") if not serial or not filename: raise HTTPException( status_code=404, detail=f"Event {event_id} has no event file in the store", ) if not (body.review or body.extensions): raise HTTPException( status_code=400, detail="PATCH body must include `review` and/or `extensions`", ) new_sidecar = _get_store().patch_sidecar( serial, filename, review=body.review, extensions=body.extensions, ) if new_sidecar is None: raise HTTPException( status_code=404, detail=f"No .sfm.json sidecar on disk for {filename}", ) # Mirror false_trigger from review block into the SQL index column. if body.review is not None: _get_db().update_event_review(event_id, new_sidecar.get("review", {})) return new_sidecar # ── /db/import/blastware_file — ingest BW-only event files ──────────────────── @app.post("/db/import/blastware_file") async def db_import_blastware_file( files: list[UploadFile] = File(...), serial: Optional[str] = Query(None, description="Optional serial-number hint (e.g. BE11529); falls back to the BW filename's encoded prefix when omitted"), ) -> dict: """ Multipart upload of one or more Blastware event file binaries (typically produced by Blastware's own ACH). For each file: 1. Parse the bytes via WaveformStore.save_imported_bw — produces a parsed Event + copies the file into the persistent store + writes a .sfm.json sidecar with source.kind = "bw-import". 2. Upsert a row into `events` (dedup'd on serial+timestamp). **Paired BW ASCII reports.** When Blastware's ACH writes events, it also emits a per-event report alongside each binary as ``.TXT`` (e.g. ``M529LK44.AB0`` + ``M529LK44.AB0.TXT``). If a request includes ``.TXT`` files matching a binary's filename, the report is parsed and its decoded fields land in the sidecar's ``bw_report`` block — including device-authoritative peaks, ZC Freq, Peak Acceleration, Peak Displacement, Time of Peak, sensor self-check results, and monitor-log timestamps. The daemon- forwarded ACH workflow should always send both files together so the SFM database has the rich metadata for sort/filter/report. Pairing is by exact filename match (case-insensitive on the extension): a binary named ``foo.AB0`` is paired with a report named ``foo.AB0.TXT`` or ``foo.AB0.txt``. Response includes per-file outcomes so the caller can see which landed cleanly and which failed (e.g. malformed file, unknown serial, etc.). """ store = _get_store() db = _get_db() results: list[dict] = [] # Read every upload up front (UploadFile.read() is one-shot under # FastAPI's spooled-tempfile backing) and split into binaries vs # paired ASCII reports. binaries: list[tuple[str, bytes]] = [] reports: dict[str, bytes] = {} # keyed by lower-cased stem (without .txt) for upload in files: name = upload.filename or "" try: content = await upload.read() except Exception as exc: results.append({ "filename": name or "", "status": "error", "detail": f"read failed: {exc}", }) continue if name.lower().endswith(".txt"): # Strip the ".txt" suffix to get the binary's filename. reports[name[:-4].lower()] = content else: binaries.append((name, content)) for filename, content in binaries: report_bytes = reports.get(filename.lower()) try: ev, rec = store.save_imported_bw( content, source_path=Path(filename or "imported.bw"), serial_hint=serial, bw_report_text=report_bytes, ) inserted, skipped = db.insert_events( [ev], serial=(serial or _serial_from_event(ev) or "UNKNOWN"), waveform_records={ ev._waveform_key.hex(): rec if ev._waveform_key else None } if ev._waveform_key else None, ) results.append({ "filename": filename, "status": "ok", "stored_filename": rec["filename"], "filesize": rec["filesize"], "sha256": rec["sha256"], "report_attached": report_bytes is not None, "inserted": inserted, "skipped": skipped, }) except Exception as exc: log.error("import failed for %s: %s", filename, exc, exc_info=True) results.append({ "filename": filename, "status": "error", "detail": str(exc), }) # Surface unmatched .txt uploads so the daemon can detect mis-pairings. used_report_keys = {fn.lower() for fn, _ in binaries} for stem in reports.keys() - used_report_keys: results.append({ "filename": stem + ".txt", "status": "warning", "detail": "BW ASCII report supplied but no matching binary in this upload", }) return {"count": len(results), "results": results} def _serial_from_event(ev) -> Optional[str]: """Fallback serial resolver — currently relies on the BW filename decoder via WaveformStore.save_imported_bw, so this is just a placeholder for future enhancement (e.g. inferring from project_info).""" return None @app.get("/db/units/{serial}/waveforms.zip") def db_unit_waveforms_zip( serial: str, from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"), ) -> StreamingResponse: """ Stream a ZIP of all event files for a serial in the optional date range. Events without a stored event file are silently skipped. """ import io import zipfile from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None rows = _get_db().query_events( serial=serial, from_dt=from_parsed, to_dt=to_parsed, limit=limit, offset=0, ) store = _get_store() buf = io.BytesIO() written = 0 with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: for row in rows: fn = row.get("blastware_filename") if not fn: continue bw_path = store.open_blastware(serial, fn) if bw_path is None: continue zf.write(bw_path, arcname=fn) written += 1 if written == 0: raise HTTPException( status_code=404, detail=f"No stored Blastware files found for serial {serial} in range", ) buf.seek(0) safe_serial = serial.replace("/", "_") headers = { "Content-Disposition": f'attachment; filename="{safe_serial}_waveforms.zip"', "X-Waveform-Count": str(written), } return StreamingResponse(buf, media_type="application/zip", headers=headers) @app.get("/db/monitor_log") def db_monitor_log( serial: Optional[str] = Query(None, description="Filter by unit serial"), from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"), to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"), limit: int = Query(500, description="Max rows to return"), offset: int = Query(0, description="Pagination offset"), ) -> dict: """ Query monitor log entries (continuous monitoring intervals) from the DB. Returns entries newest-first. """ from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None rows = _get_db().query_monitor_log( serial=serial, from_dt=from_parsed, to_dt=to_parsed, limit=limit, offset=offset, ) return {"count": len(rows), "entries": rows} @app.get("/db/sessions") def db_sessions( serial: Optional[str] = Query(None, description="Filter by unit serial"), limit: int = Query(50, description="Max rows to return"), ) -> dict: """ Query ACH call-home sessions from the DB, newest first. """ rows = _get_db().get_sessions(serial=serial, limit=limit) return {"count": len(rows), "sessions": rows} # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server") ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)") ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)") ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)") args = ap.parse_args() log.info("Starting SFM server on %s:%d", args.host, args.port) uvicorn.run( "sfm.server:app", host=args.host, port=args.port, reload=args.reload, )