Files
seismo-relay/sfm/server.py
T
serversdown c641d5fc10 feat: v0.15.0
### Added

- **Layered event storage architecture.**  Each event now lands as four
  files in the per-serial waveform store, each with a clear role:

  - `<filename>` — the Blastware-readable binary (BW file).  Untouched.
  - `<filename>.a5.pkl` — the raw 5A frames (regenerative source).
  - `<filename>.h5` — clean per-channel waveform arrays in physical
    units (in/s for geo, psi for mic) plus event metadata (HDF5 with
    gzip compression).  This is the canonical format for downstream
    analysis tools.
  - `<filename>.sfm.json` — the modern review/metadata sidecar (peaks,
    project, source provenance, review state, extensions).

  SQLite (`seismo_relay.db`) is the searchable index over all four.

- **Plot-ready waveform JSON (`sfm.plot.v1`).**  The `/device/event/{idx}/waveform`
  and `/db/events/{id}/waveform.json` endpoints now return samples in
  physical units with explicit time-axis metadata, peak markers, and
  per-channel unit hints — no more guessing the ADC-to-velocity scale
  client-side.  The webapp waveform viewer was rewritten to consume
  this shape.

- **In-app waveform viewer accuracy fix.**  The standalone SFM webapp
  viewer was scaling geophone amplitudes by `geoAdcScale / 32767`
  (≈ 6.206 / 32767), where `geoAdcScale = 6.206053` is the device's
  *in/s per V* hardware constant — not the ADC-counts-to-velocity
  factor.  This silently scaled every plot ~38% too low for Normal-range
  geophones (the correct full-scale is 10.0 in/s, or 1.25 in/s for
  Sensitive).  Conversion is now done server-side using the geo_range
  from compliance config; the client just plots.

- New `sfm/event_hdf5.py` module: `write_event_hdf5()`,
  `read_event_hdf5()`, plus a plot-JSON helper.
- Backfill script extended to also emit `.h5` for existing events.

### Dependencies

- Added `h5py>=3.10` and `numpy>=1.24` for the HDF5 storage layer.
- Added `python-multipart>=0.0.7` (required by FastAPI for the
  `/db/import/blastware_file` endpoint introduced in this release).
2026-05-08 04:39:51 +00:00

1790 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
sfm/server.py — Seismograph Field Module REST API
Wraps the minimateplus library in a small FastAPI service.
Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100).
Default port: 8200
Endpoints
---------
GET /health Service heartbeat — no device I/O
GET /device/info POLL + serial number + full config read
GET /device/events Download all stored events (headers + peak values)
POST /device/connect Explicit connect/identify (same as /device/info)
GET /device/event/{idx} Single event by index (header + waveform record)
Transport query params (supply one set):
Serial (direct RS-232 cable):
port — serial port name (e.g. COM5, /dev/ttyUSB0)
baud — baud rate (default 38400)
TCP (modem / ACH Auto Call Home):
host — IP address or hostname of the modem or ACH relay
tcp_port — TCP port number (default 12345, Blastware default)
Each call opens the connection, does its work, then closes it.
(Stateless / reconnect-per-call, matching Blastware's observed behaviour.)
Run with:
python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload
or:
python sfm/server.py
"""
from __future__ import annotations
import datetime
import logging
import sys
import tempfile
import threading
import time
from pathlib import Path
from typing import Optional
# FastAPI / Pydantic
try:
from fastapi import Body, FastAPI, File, HTTPException, Query, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel
import uvicorn
except ImportError:
print(
"fastapi and uvicorn are required for the SFM server.\n"
"Install them with: pip install fastapi uvicorn",
file=sys.stderr,
)
sys.exit(1)
from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform
from sfm import event_hdf5
from sfm.cache import SFMCache, get_cache
from sfm.database import SeismoDb
from sfm.live_cache import LiveCache as _LiveCache
from sfm.waveform_store import WaveformStore
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("sfm.server")
# ── FastAPI app ────────────────────────────────────────────────────────────────
app = FastAPI(
title="Seismograph Field Module (SFM)",
description=(
"REST API for Instantel MiniMate Plus seismographs.\n"
"Implements the minimateplus RS-232 protocol library.\n"
"Proxied by terra-view at /api/sfm/*."
),
version="0.1.0",
)
# Allow requests from the waveform viewer opened as a local file (file://)
# and from any dev server or terra-view proxy.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ── DB ────────────────────────────────────────────────────────────────────────
# Shared SeismoDb instance. Path can be overridden by --db-path at startup,
# or defaults to bridges/captures/seismo_relay.db relative to the repo root.
_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db"
_db: Optional[SeismoDb] = None
_store: Optional[WaveformStore] = None
def _get_db() -> SeismoDb:
global _db
if _db is None:
_db = SeismoDb(_DEFAULT_DB_PATH)
return _db
def _get_store() -> WaveformStore:
"""
Persistent event-file + A5-sidecar store, rooted at <db_dir>/waveforms/.
Mirrors the layout used by bridges/ach_server.py so files saved by ACH
ingestion and by live SFM downloads share one canonical location.
"""
global _store
if _store is None:
_store = WaveformStore(_get_db().db_path.parent / "waveforms")
return _store
# ── Live device cache ─────────────────────────────────────────────────────────
# In-memory cache for live device data. Avoids re-dialing the device on every
# request when the data hasn't changed.
#
# Keyed by conn_key ("tcp:host:port" or "serial:port:baud").
# Does NOT persist across server restarts — this is purely an in-process cache
# to reduce TCP round-trips and cellular data usage.
#
# Invalidation rules:
# device_info — cached until POST /device/config marks it dirty
# events — cached by (conn_key, device_event_count); re-fetched when
# a quick count_events() probe shows new events on the device
# monitor_status — 30-second TTL (changes frequently during monitoring)
# waveforms — permanent (immutable once recorded; indexed by conn_key+idx)
#
# All endpoints accept ?force=true to bypass the cache and re-read from device.
_live_cache = _LiveCache()
# ── Serialisers ────────────────────────────────────────────────────────────────
# Plain dict helpers — avoids a Pydantic dependency in the library layer.
def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]:
if ts is None:
return None
return {
"year": ts.year,
"month": ts.month,
"day": ts.day,
"hour": ts.hour,
"minute": ts.minute,
"second": ts.second,
"clock_set": ts.clock_set,
"display": str(ts),
}
def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]:
if pv is None:
return None
return {
"tran_in_s": pv.tran,
"vert_in_s": pv.vert,
"long_in_s": pv.long,
"micl_psi": pv.micl,
"peak_vector_sum": pv.peak_vector_sum,
}
def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]:
if pi is None:
return None
return {
"setup_name": pi.setup_name,
"project": pi.project,
"client": pi.client,
"operator": pi.operator,
"sensor_location": pi.sensor_location,
"notes": pi.notes,
}
def _serialise_compliance_config(cc: Optional["ComplianceConfig"]) -> Optional[dict]:
if cc is None:
return None
return {
"recording_mode": cc.recording_mode, # 0x00=Single Shot, 0x01=Continuous, 0x03=Histogram, 0x04=Histogram+Continuous
"sample_rate": cc.sample_rate,
"histogram_interval_sec": cc.histogram_interval_sec, # seconds; None if not Histogram mode
"record_time": cc.record_time,
"trigger_level_geo": cc.trigger_level_geo,
"alarm_level_geo": cc.alarm_level_geo,
"geo_adc_scale": cc.geo_adc_scale, # hw scale factor (in/s)/V — informational only, do not write
"geo_range": cc.geo_range, # CONFIRMED 2026-04-20: 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s
"setup_name": cc.setup_name,
"project": cc.project,
"client": cc.client,
"operator": cc.operator,
"sensor_location": cc.sensor_location,
"notes": cc.notes,
}
def _serialise_call_home_config(ch: Optional["CallHomeConfig"]) -> Optional[dict]:
if ch is None:
return None
return {
"auto_call_home_enabled": ch.auto_call_home_enabled,
"dial_string": ch.dial_string,
"after_event_recorded": ch.after_event_recorded,
"at_specified_times": ch.at_specified_times,
"time1_enabled": ch.time1_enabled,
"time1_hour": ch.time1_hour,
"time1_min": ch.time1_min,
"time2_enabled": ch.time2_enabled,
"time2_hour": ch.time2_hour,
"time2_min": ch.time2_min,
"num_retries": ch.num_retries,
"time_between_retries_sec": ch.time_between_retries_sec,
"wait_for_connection_sec": ch.wait_for_connection_sec,
"warm_up_time_sec": ch.warm_up_time_sec,
}
def _serialise_device_info(info: DeviceInfo) -> dict:
return {
"serial": info.serial,
"firmware_version": info.firmware_version,
"firmware_minor": info.firmware_minor,
"dsp_version": info.dsp_version,
"manufacturer": info.manufacturer,
"model": info.model,
"event_count_sub08": info.event_count, # unreliable — SUB 08 always returns 1
"compliance_config": _serialise_compliance_config(info.compliance_config),
}
def _serialise_event(ev: Event, debug: bool = False) -> dict:
d: dict = {
"index": ev.index,
"timestamp": _serialise_timestamp(ev.timestamp),
"sample_rate": ev.sample_rate,
"record_type": ev.record_type,
"peak_values": _serialise_peak_values(ev.peak_values),
"project_info": _serialise_project_info(ev.project_info),
}
if debug:
raw = getattr(ev, "_raw_record", None)
d["raw_record_hex"] = raw.hex() if raw else None
d["raw_record_len"] = len(raw) if raw else 0
return d
# ── Transport factory ─────────────────────────────────────────────────────────
def _build_client(
port: Optional[str],
baud: int,
host: Optional[str],
tcp_port: int,
timeout: float = 30.0,
) -> MiniMateClient:
"""
Return a MiniMateClient configured for either serial or TCP transport.
TCP takes priority if *host* is supplied; otherwise *port* (serial) is used.
Raises HTTPException(422) if neither is provided.
Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform
download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over
cellular and each individual recv must complete within the timeout window.
"""
if host:
transport = TcpTransport(host, port=tcp_port)
log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout)
return MiniMateClient(transport=transport, timeout=timeout)
elif port:
log.debug("Serial transport: %s baud=%d", port, baud)
return MiniMateClient(port, baud)
else:
raise HTTPException(
status_code=422,
detail=(
"Specify either 'port' (serial, e.g. ?port=COM5) "
"or 'host' (TCP, e.g. ?host=192.168.1.50&tcp_port=12345)"
),
)
def _is_tcp(host: Optional[str]) -> bool:
return bool(host)
def _run_with_retry(fn, *, is_tcp: bool):
"""
Call fn() and, for TCP connections only, retry once on ProtocolError.
Rationale: when a MiniMate Plus is cold (just had its serial lines asserted
by the modem or a local bridge), it takes 5-10 seconds to boot before it
will respond to POLL_PROBE. The first request may time out during that boot
window; a single automatic retry is enough to recover once the unit is up.
Serial connections are NOT retried — a timeout there usually means a real
problem (wrong port, wrong baud, cable unplugged).
"""
try:
return fn()
except ProtocolError as exc:
if not is_tcp:
raise
log.info("TCP poll timed out (unit may have been cold) — retrying once")
return fn() # let any second failure propagate normally
# ── Helpers ────────────────────────────────────────────────────────────────────
def _backfill_events(events: list, info: "DeviceInfo") -> None:
"""
Fill in sample_rate and project_info fields that the per-event waveform
record doesn't carry — sourced from the device's compliance config.
Extracted from device_events() so it can be called from both the full
download path and the partial (new-events-only) path.
"""
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
# ── Endpoints ──────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict:
"""Service heartbeat. No device I/O."""
return {"status": "ok", "service": "sfm", "version": "0.1.0"}
@app.get("/", response_class=FileResponse)
def webapp():
"""Serve the SFM web app."""
return str(Path(__file__).parent / "sfm_webapp.html")
@app.get("/waveform", response_class=FileResponse)
def waveform_viewer():
"""Serve the standalone waveform viewer."""
return str(Path(__file__).parent / "waveform_viewer.html")
@app.get("/device/info")
def device_info(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"),
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Connect to the device, perform the POLL startup handshake, and return
identity information (serial number, firmware version, model).
Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect — provided as GET for convenience.
**Caching**: device identity and compliance config are cached after the first
successful read (they rarely change). Pass *force=true* to bypass the cache
and re-read directly from the device (e.g. after a config push).
The cache is also automatically invalidated after POST /device/config.
"""
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_device_info(conn_key)
if cached is not None:
log.info("device info cache hit for %s", conn_key)
cached["_cached"] = True
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
# SUB 08 event_count is unreliable (always returns 1 regardless of
# actual storage). Count via 1E/1F chain instead.
info.event_count = client.count_events()
return info
info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
result = _serialise_device_info(info)
cache.set_device_info(conn_key, result)
return result
@app.post("/device/connect")
def device_connect(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Connect to the device and return identity. POST variant for terra-view
compatibility with the SLMM proxy pattern.
"""
return device_info(port=port, baud=baud, host=host, tcp_port=tcp_port)
@app.get("/device/events")
def device_events(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download all events from device"),
) -> dict:
"""
Connect to the device, read the event index, and download all stored
events (event headers + full waveform records with peak values).
Supply either *port* (serial) or *host* (TCP/modem).
**Caching:** a quick count_events() probe (~2s) is performed first. If the
device's event count matches the cached count, the cached response is returned
immediately without a full download. Pass ?force=true to skip this and always
re-download.
Pass debug=true to include raw_record_hex in each event — useful for
verifying field offsets against the protocol reference.
This does NOT download raw ADC waveform samples — those are large and
fetched separately via GET /device/event/{idx}/waveform.
**Caching**: event headers are cached after the first download. On subsequent
calls, the device is contacted only to check the event count (fast: ~2s).
If the count matches the cache, all events are returned from cache instantly.
If new events exist on the device, only the new ones are downloaded and merged.
Pass *force=true* to bypass the cache entirely and re-download everything.
"""
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
# ── Smart cache path (skip when debug=True or force=True) ────────────────
# debug mode uses raw_record_hex which isn't stored in the cache, so we
# must always go to the device when debug is requested.
if not force and not debug:
cached_events = cache.get_all_events(conn_key)
cached_count = len(cached_events) if cached_events else 0
if cached_count > 0:
# Quick device contact: just count events via the fast 1E/1F chain.
# This takes ~2s instead of the full event download (~10-30s).
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
except HTTPException:
raise
except (ProtocolError, OSError, Exception) as exc:
# If we can't reach the device at all, serve stale cache rather
# than returning an error — field units go offline regularly.
log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
"_stale": True,
}
if device_count == cached_count:
# Nothing new — return cache immediately, no event download needed.
log.info(
"event cache hit for %s: %d events, device count matches",
conn_key, cached_count,
)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
}
if device_count > cached_count:
# New events on the device — download all events but only store/return
# the new ones. Events are append-only; indices 0..(cached_count-1)
# are already in the cache and don't need to be re-downloaded logically,
# but the protocol requires iterating from event 0 to reach later ones.
# The device download time is dominated by the number of events requested,
# so we stop at the last known event index to avoid re-downloading everything.
log.info(
"new events on device %s: have %d, device has %d — fetching all up to %d",
conn_key, cached_count, device_count, device_count - 1,
)
try:
def _fetch_new():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
all_evs = client.get_events(stop_after_index=device_count - 1)
return info, all_evs
info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
_backfill_events(all_events, info)
# Only the new events (indices >= cached_count) are truly new.
new_events = [ev for ev in all_events if ev.index >= cached_count]
new_serialised = [_serialise_event(ev) for ev in new_events]
cache.set_events(conn_key, new_serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
merged_events = cache.get_all_events(conn_key)
return {
"device": _serialise_device_info(info),
"event_count": len(merged_events),
"events": merged_events,
"_cached": True,
"_new_events": len(new_events),
}
# ── Full download path (first call, force=True, or debug=True) ───────────
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
return client.connect(), client.get_events(debug=debug)
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
_backfill_events(events, info)
serialised = [_serialise_event(ev, debug=debug) for ev in events]
if not debug:
# Only cache when not in debug mode (debug adds raw_record_hex which
# we don't want polluting the normal cache entries).
cache.set_events(conn_key, serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
serialised_info = _serialise_device_info(info)
serialised_events = [_serialise_event(ev, debug=debug) for ev in events]
# Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache)
if not debug:
_live_cache.set_device_info(conn_key, serialised_info)
_live_cache.set_events(conn_key, len(events), serialised_events)
return {
"device": serialised_info,
"event_count": len(events),
"events": serialised,
}
@app.get("/device/event/{index}")
def device_event(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download a single event by index (0-based).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → event index → event header → waveform record.
**Caching**: if this event was already downloaded (e.g. via GET /device/events),
it is returned instantly from cache with no device contact.
"""
log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_event(conn_key, index)
if cached is not None:
log.info("event cache hit for %s index %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
events = client.get_events(stop_after_index=index)
return info, events
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
matching = [ev for ev in events if ev.index == index]
if not matching:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
_backfill_events(matching, info)
result = _serialise_event(matching[0])
# Store all downloaded events (we paid for them anyway — indices 0..index)
all_serialised = [_serialise_event(ev) for ev in events]
cache.set_events(conn_key, all_serialised)
return result
@app.get("/device/event/{index}/waveform")
def device_event_waveform(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download the full raw ADC waveform for a single event (0-based index).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → get_events() (to locate the 4-byte waveform key) →
download_waveform() (full SUB 5A stream, stop_after_metadata=False).
Response includes:
- **total_samples**: expected sample-sets from the STRT record
- **pretrig_samples**: pre-trigger sample count
- **rectime_seconds**: record duration
- **samples_decoded**: actual sample-sets decoded (may be less than total_samples
if the device is not storing all frames yet, or the capture was partial)
- **sample_rate**: samples per second (from compliance config)
- **channels**: dict of channel name → list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic")
**Caching**: full waveforms are cached permanently after the first download —
they are immutable once recorded on the device. Subsequent requests for the
same event return instantly from cache without any device contact.
Pass *force=true* to force a fresh download (rarely needed).
"""
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_waveform(conn_key, index)
if cached is not None:
log.info("waveform cache hit for %s event %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
info = client.connect()
# stop_after_index avoids downloading events beyond the one requested.
events = client.get_events(full_waveform=True, stop_after_index=index)
matching = [ev for ev in events if ev.index == index]
return matching[0] if matching else None, info
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
if ev is None:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
# Backfill from compliance_config: sample_rate, record_time, and
# derived total_samples. These are user-set authoritative values; the
# corresponding STRT-derived guesses in `_decode_a5_waveform` can be
# off (e.g. rectime used to read the 0x46 record-type marker = 70s).
cc = info.compliance_config
if cc:
if ev.sample_rate is None and cc.sample_rate:
ev.sample_rate = cc.sample_rate
if cc.record_time:
ev.rectime_seconds = cc.record_time
if ev.sample_rate and ev.rectime_seconds:
derived = int(round(ev.sample_rate * ev.rectime_seconds))
if (ev.total_samples is None
or ev.total_samples > derived * 2
or ev.total_samples < derived // 4):
ev.total_samples = derived
geo_range = getattr(cc, "geo_range", None) if cc else None
# Build the plot.v1 JSON: samples in physical units (in/s for geo, psi
# for mic), explicit time axis, peak markers — the shape clients should
# consume directly without doing any ADC scaling.
serial = getattr(info, "serial", None) or ""
result = event_hdf5.event_to_plot_json(
ev, serial=serial,
geo_range=geo_range or "normal",
index=index,
)
cache.set_waveform(conn_key, index, result)
return result
@app.get("/device/event/{index}/blastware_file")
def device_event_blastware_file(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass any cached/dedup'd state and re-download from device"),
) -> FileResponse:
"""
Download the waveform for a single event (0-based index) and return it
as a Blastware-compatible binary file with a correct Blastware filename.
Supply either *port* (serial) or *host* (TCP/modem).
The file is written to the OS temp directory and streamed back as a binary
download. Blastware can open it directly — filename encodes serial + timestamp.
Filename format: <prefix><serial3><stem><AB>0<W|H>
- prefix letter = chr(ord('B') + floor(serial_numeric / 1000))
- stem + AB = second-resolution timestamp since 1985-01-01 local
- W / H = Full Waveform / Full Histogram (defaults to W for
triggered events; histogram requires recording_mode
to be populated from compliance config)
Performs: POLL startup → get_events(full_waveform=True,
stop_after_index=index) → write_blastware_file() → FileResponse +
persistent store + DB upsert.
"""
log.info(
"GET /device/event/%d/blastware_file port=%s host=%s force=%s",
index, port, host, force,
)
# `force` always re-downloads from the device. This endpoint already
# never short-circuits via cache, so `force` is reserved for parity with
# the other live endpoints.
try:
def _do():
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
info = client.connect()
# full_waveform=True pulls the complete 5A stream so the
# client populates STRT-derived fields (total_samples,
# pretrig_samples, rectime_seconds) AND raw_samples on the
# Event. Required for the .h5 + .sfm.json sidecar to be
# filled in correctly — without it, those land as nulls.
events = client.get_events(
full_waveform=True,
stop_after_index=index,
)
matching = [ev for ev in events if ev.index == index]
return matching[0] if matching else None, info
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
log.error("blastware_file: protocol error: %s", exc, exc_info=True)
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
log.error("blastware_file: connection error: %s", exc, exc_info=True)
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
log.error("blastware_file: unexpected error: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
if ev is None:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
a5_frames = getattr(ev, "_a5_frames", None)
if not a5_frames:
raise HTTPException(
status_code=502,
detail=f"No waveform data received for event index {index} — 5A download failed",
)
# Determine serial number from device info
serial = getattr(info, "serial", None) or "UNKNOWN"
# Build filename using the same algorithm Blastware uses
filename = blastware_filename(ev, serial)
# Write to OS temp dir (cross-platform: /tmp on Linux/macOS,
# %TEMP% on Windows) so FastAPI can stream it back via FileResponse.
out_path = Path(tempfile.gettempdir()) / filename
# Delete any stale file at this path before writing. On Windows we have
# observed the new (smaller) file getting trailing zero-bytes from the
# previous (larger) file when filesystem semantics around open(...,"wb")
# don't truncate cleanly (e.g. through a synced folder). Explicit unlink
# eliminates that ambiguity.
try:
out_path.unlink()
except FileNotFoundError:
pass
write_blastware_file(ev, a5_frames, out_path)
log.info(
"blastware_file: wrote %s (%d A5 frames, serial=%s)",
out_path, len(a5_frames), serial,
)
# Promote to canonical persistent store + DB row so this event is
# queryable via /db/events afterwards (matches the ACH ingestion path).
if serial != "UNKNOWN" and ev._waveform_key is not None:
try:
cc = info.compliance_config
# Backfill authoritative compliance-config values onto the
# Event before persisting. These supersede whatever
# _decode_a5_waveform read from the STRT bytes (some of which
# have ambiguous semantics — e.g. STRT[20] is rectime but
# STRT[8:10] / STRT[16:18] are device-specific scratch fields
# that aren't reliable sample/pretrig counts).
if cc:
if ev.sample_rate is None and cc.sample_rate:
ev.sample_rate = cc.sample_rate
if cc.record_time:
# record_time from compliance is authoritative — the
# user-set value the device followed when recording.
ev.rectime_seconds = cc.record_time
# Derive total_samples from sample_rate × rectime when
# we can; the STRT-derived value can land at a buffer-
# offset rather than a sample count.
if ev.sample_rate and ev.rectime_seconds:
derived = int(round(ev.sample_rate * ev.rectime_seconds))
if (ev.total_samples is None
or ev.total_samples > derived * 2
or ev.total_samples < derived // 4):
ev.total_samples = derived
geo_range = getattr(cc, "geo_range", None) if cc else None
rec = _get_store().save(
ev, serial=serial, a5_frames=a5_frames,
geo_range=geo_range if geo_range is not None else "normal",
)
_get_db().insert_events(
[ev],
serial=serial,
waveform_records={ev._waveform_key.hex(): rec},
)
log.info(
"blastware_file: persisted to store (%s, %d bytes)",
rec["filename"], rec["filesize"],
)
except Exception as exc:
log.warning(
"blastware_file: persistent store save failed: %s "
"— temp file still served",
exc,
)
return FileResponse(
path=str(out_path),
filename=filename,
media_type="application/octet-stream",
)
# ── Write endpoints ───────────────────────────────────────────────────────────
class DeviceConfigBody(BaseModel):
"""
Request body for POST /device/config.
All fields are optional — only supplied (non-null) fields are written to
the device. All other config bytes are round-tripped verbatim.
Recording parameters
--------------------
recording_mode : Recording mode enum. Values: 0=Single Shot, 1=Continuous, 3=Histogram, 4=Histogram+Continuous.
sample_rate : Samples per second. Valid values: 1024, 2048, 4096.
record_time : Record duration in seconds (e.g. 1.0, 2.0, 3.0).
Trigger / alarm thresholds and range (geo channels)
----------------------------------------------------
trigger_level_geo : Trigger threshold in in/s (e.g. 0.5).
alarm_level_geo : Alarm threshold in in/s (e.g. 1.0).
geo_range : Geophone range/sensitivity. 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s.
Project / operator strings (max 41 ASCII characters each)
----------------------------
project : Project description.
client_name : Client / company name.
operator : Operator / technician name.
seis_loc : Sensor location description.
notes : Extended notes.
"""
# Recording parameters
recording_mode: Optional[int] = None
sample_rate: Optional[int] = None
record_time: Optional[float] = None
histogram_interval_sec: Optional[int] = None # seconds: 2, 5, 15, 60, 300, 900 (mode-gated)
# Threshold parameters / geo range
trigger_level_geo: Optional[float] = None
alarm_level_geo: Optional[float] = None
geo_range: Optional[int] = None # 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s
# Project / operator strings
project: Optional[str] = None
client_name: Optional[str] = None
operator: Optional[str] = None
seis_loc: Optional[str] = None
notes: Optional[str] = None
@app.post("/device/config")
def device_config(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the current device config, apply any supplied changes to the compliance
block, and write the full config back.
Only non-null fields in the JSON body are modified. All other config bytes
are round-tripped verbatim from the device.
Supply either *port* (serial) or *host* (TCP/modem).
Example body (all fields optional — include only what you want to change):
{
"recording_mode": 1,
"sample_rate": 1024,
"record_time": 3.0,
"trigger_level_geo": 0.5,
"alarm_level_geo": 1.0,
"project": "Bridge Inspection 2026",
"client_name": "City of Portland",
"operator": "Brian Harrison",
"seis_loc": "South Abutment",
"notes": "Pre-blast baseline"
}
Returns:
{"status": "ok", "updated_fields": {...}} on success.
Raises:
502 on protocol errors (timeout, bad ack, etc.).
422 if neither port nor host is provided.
"""
changed = body.model_dump(exclude_none=True)
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys()))
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
client.apply_config(
recording_mode=body.recording_mode,
sample_rate=body.sample_rate,
record_time=body.record_time,
histogram_interval_sec=body.histogram_interval_sec,
trigger_level_geo=body.trigger_level_geo,
alarm_level_geo=body.alarm_level_geo,
geo_range=body.geo_range,
project=body.project,
client_name=body.client_name,
operator=body.operator,
seis_loc=body.seis_loc,
notes=body.notes,
)
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Config was written to the device — the cached compliance config is now stale.
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().mark_config_dirty(conn_key)
return {
"status": "ok",
"updated_fields": changed,
}
# Keep the old endpoint alive under its old URL for anything already calling it
@app.post("/device/config/project")
def device_config_project(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""Deprecated alias for POST /device/config — use that instead."""
return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port)
# ── Monitoring endpoints ───────────────────────────────────────────────────────
@app.get("/device/monitor/status")
def device_monitor_status(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Read monitoring status from the device.
Uses poll() (POLL handshake only — no config/compliance reads) so the
request completes in ~2 seconds instead of ~15. The full connect() was
causing false "idle" readings because the compliance+event sequence was
interacting with the device state before the 0x1C read.
Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle.
**Caching**: status is cached for 30 seconds to reduce cellular polling overhead.
Pass *force=true* to bypass the cache for an immediate fresh read.
"""
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_monitor_status(conn_key)
if cached is not None:
log.debug("monitor status cache hit for %s", conn_key)
cached["_cached"] = True
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("monitor status poll retry: %s", exc)
client.poll()
status = client.get_monitor_status()
result: dict = {"is_monitoring": status.is_monitoring}
if status.battery_v is not None:
result["battery_v"] = round(status.battery_v, 2)
if status.memory_total is not None:
result["memory_total_bytes"] = status.memory_total
result["memory_total_kb"] = round(status.memory_total / 1024, 1)
if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
cache.set_monitor_status(conn_key, result)
return result
@app.post("/device/monitor/start")
def device_monitor_start(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to start monitoring (recording triggered events).
Sends SUB 0x96 and waits for ack SUB 0x69.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("start monitoring poll retry: %s", exc)
client.poll()
client.start_monitoring()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "started"}
@app.post("/device/monitor/stop")
def device_monitor_stop(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to stop monitoring.
Sends SUB 0x97 and waits for ack SUB 0x68.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("stop monitoring poll retry: %s", exc)
client.poll()
client.stop_monitoring()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "stopped"}
# ── Call home config endpoints ───────────────────────────────────────────────
@app.get("/device/call_home")
def device_call_home_get(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the Auto Call Home (ACH) configuration from the device.
Sends SUB 0x2C (two-step read) and returns the decoded call home config.
Confirmed from 4-20-26 call home settings captures (BE11529).
Returns:
{
"auto_call_home_enabled": true/false,
"dial_string": "RADIO RING",
"after_event_recorded": true/false,
"at_specified_times": true/false,
"time1_enabled": true/false, "time1_hour": 19, "time1_min": 55,
"time2_enabled": false, "time2_hour": 0, "time2_min": 0,
"num_retries": 3,
"time_between_retries_sec": 15,
"wait_for_connection_sec": 60,
"warm_up_time_sec": 60
}
"""
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.poll()
return client.get_call_home_config()
ch_config = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return _serialise_call_home_config(ch_config) or {}
class CallHomeConfigBody(BaseModel):
"""
Request body for POST /device/call_home.
All fields are optional — only supplied (non-null) fields are modified.
All other call home config bytes are round-tripped verbatim from the device.
Confirmed writable fields (4-20-26 captures):
auto_call_home_enabled : bool — master enable for auto call home
after_event_recorded : bool — call home after each triggered event
at_specified_times : bool — enable time-based scheduled calls
time1_enabled : bool — enable time slot 1
time1_hour : int — hour for slot 1 (0-23; avoid 3 — DLE escape limitation)
time1_min : int — minute for slot 1 (0-59; avoid 3)
time2_enabled : bool — enable time slot 2
time2_hour : int — hour for slot 2 (0-23; avoid 3)
time2_min : int — minute for slot 2 (0-59; avoid 3)
Read-only fields (not writable via this endpoint):
dial_string, num_retries, time_between_retries_sec,
wait_for_connection_sec, warm_up_time_sec
"""
auto_call_home_enabled: Optional[bool] = None
after_event_recorded: Optional[bool] = None
at_specified_times: Optional[bool] = None
time1_enabled: Optional[bool] = None
time1_hour: Optional[int] = None
time1_min: Optional[int] = None
time2_enabled: Optional[bool] = None
time2_hour: Optional[int] = None
time2_min: Optional[int] = None
@app.post("/device/call_home")
def device_call_home_set(
body: CallHomeConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the current call home config, apply supplied changes, and write back.
Only non-null fields are modified. All other bytes round-trip verbatim.
Write sequence (confirmed from 4-20-26 call home settings captures):
SUB 0x2C (read 2-step) → 125-byte raw payload
patch fields
SUB 0x7E (write 127-byte payload) → ack 0x81
SUB 0x7F (confirm) → ack 0x80
Example body:
{ "auto_call_home_enabled": true, "after_event_recorded": true,
"time1_enabled": true, "time1_hour": 20, "time1_min": 0 }
"""
changed = body.model_dump(exclude_none=True)
log.info("POST /device/call_home port=%s host=%s fields=%s", port, host, list(changed.keys()))
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.poll()
client.set_call_home_config(
auto_call_home_enabled=body.auto_call_home_enabled,
after_event_recorded=body.after_event_recorded,
at_specified_times=body.at_specified_times,
time1_enabled=body.time1_enabled,
time1_hour=body.time1_hour,
time1_min=body.time1_min,
time2_enabled=body.time2_enabled,
time2_hour=body.time2_hour,
time2_min=body.time2_min,
)
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return {"status": "ok", "updated_fields": changed}
# ── Cache management endpoints ────────────────────────────────────────────────
@app.get("/cache/stats")
def cache_stats() -> dict:
"""
Return row counts for all cache tables.
Useful for debugging and verifying that caching is working as expected.
"""
return get_cache().stats()
@app.delete("/cache/device")
def cache_clear_device(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Clear all cached data for a specific device (identified by its connection address).
Clears: device info, all event headers, all waveforms, monitor status.
The next request to any endpoint for this device will re-fetch from the device.
Supply either *port* (serial) or *host* (TCP/modem) to identify the device.
"""
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
counts = get_cache().clear_device(conn_key)
return {"status": "cleared", "conn_key": conn_key, "deleted": counts}
# ── DB read endpoints ─────────────────────────────────────────────────────────
#
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.
# All queries are read-only. Terra-view calls these to build project event
# views, unit history panels, and (eventually) vibration summary reports.
@app.get("/db/units")
def db_units() -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries, total_sessions.
"""
return _get_db().query_units()
@app.get("/db/events")
def db_events(
serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"),
limit: int = Query(500, description="Max rows to return (default 500)"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query triggered events from the DB.
Returns events newest-first. All filter params are optional.
Example:
GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_events(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=false_trigger,
limit=limit,
offset=offset,
)
return {"count": len(rows), "events": rows}
@app.patch("/db/events/{event_id}/false_trigger")
def db_set_false_trigger(
event_id: str,
value: bool = Query(..., description="True to flag as false trigger, False to clear"),
) -> dict:
"""
Set or clear the false_trigger flag on a single event.
Used by the terra-view event review UI.
Returns 404 if the event_id is not found.
"""
found = _get_db().set_false_trigger(event_id, value)
if not found:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
return {"status": "ok", "event_id": event_id, "false_trigger": value}
# ── /db/events/{id} — waveform file accessors ─────────────────────────────────
#
# These endpoints serve files from the persistent WaveformStore, so a Blastware
# file or its decoded JSON for a previously-ingested ACH event can be fetched
# without re-dialing the device.
@app.get("/db/events/{event_id}/blastware_file")
def db_event_blastware_file(event_id: str) -> FileResponse:
"""
Return the Blastware-format event file for a previously-ingested
event. Filename extension is per-event (timestamp-encoded
`AB0T` for ACH downloads, 3-char `AB0` for direct downloads).
404 if the event is unknown or has no event file in the store
(events ingested before the store was wired will show this —
re-download via the live endpoint to populate).
"""
row = _get_db().get_event(event_id)
if row is None:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
serial = row.get("serial")
filename = row.get("blastware_filename")
if not serial or not filename:
raise HTTPException(
status_code=404,
detail=(
f"Event {event_id} has no Blastware file in the store. "
"Re-download via the live endpoint to populate."
),
)
bw_path = _get_store().open_blastware(serial, filename)
if bw_path is None:
raise HTTPException(
status_code=410,
detail=f"Stored file missing on disk: {filename}",
)
return FileResponse(
path=str(bw_path),
filename=filename,
media_type="application/octet-stream",
)
@app.get("/db/events/{event_id}/waveform.json")
def db_event_waveform_json(event_id: str) -> dict:
"""
Return the plot-ready JSON (`sfm.plot.v1`) for a stored event.
Resolution order (cheapest first):
1. If `<filename>.h5` exists, serve it via `plot_json_from_hdf5`.
Samples are already in physical units; no decode work needed.
2. Else if `<filename>.a5.pkl` exists, replay the A5 decoders to
rebuild an Event and serialise via `event_to_plot_json`.
3. Else 404 — the event has no waveform data on disk.
The shape is identical regardless of source, so clients (the SFM
webapp, Terra-View, etc.) consume the same `sfm.plot.v1` payload.
"""
row = _get_db().get_event(event_id)
if row is None:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
serial = row.get("serial")
filename = row.get("blastware_filename")
if not serial or not filename:
raise HTTPException(
status_code=404,
detail=f"Event {event_id} has no event file in the store",
)
store = _get_store()
# Path 1: HDF5 (canonical clean format).
h5_path = store.hdf5_path_for(serial, filename)
if h5_path.exists():
try:
return event_hdf5.plot_json_from_hdf5(h5_path, event_id=event_id)
except Exception as exc:
log.warning("HDF5 read failed (%s); falling back to A5 path", exc)
# Path 2: A5 pickle replay.
a5_frames = store.load_a5(serial, filename)
if not a5_frames:
raise HTTPException(
status_code=404,
detail=(
f"Event {event_id} has no waveform data on disk "
"(no .h5 and no .a5.pkl). Run the backfill script or "
"re-download via the live endpoint to populate."
),
)
ev = Event(index=-1)
try:
_decode_a5_metadata_into(a5_frames, ev)
except Exception as exc:
log.warning("db_event_waveform_json: metadata decode failed: %s", exc)
try:
_decode_a5_waveform(a5_frames, ev)
except Exception as exc:
log.error("db_event_waveform_json: waveform decode failed: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail=f"Waveform decode failed: {exc}") from exc
# Carry over fields from the DB row when the A5 replay didn't fill them.
if ev.sample_rate is None and row.get("sample_rate"):
ev.sample_rate = row.get("sample_rate")
return event_hdf5.event_to_plot_json(
ev, serial=serial, geo_range="normal", event_id=event_id,
)
# ── /db/events/{id}/sidecar — modern .sfm.json review/metadata accessors ──────
class SidecarPatchBody(BaseModel):
"""Body for PATCH /db/events/{id}/sidecar.
JSON-merge-patch semantics: only the keys you include get updated.
`review` is the editable block for monthly-summary workflows
(false_trigger flag, reviewer notes, etc.); `extensions` is the
forward-compat namespace for vendor / future fields.
"""
review: Optional[dict] = None
extensions: Optional[dict] = None
@app.get("/db/events/{event_id}/sidecar")
def db_event_sidecar(event_id: str) -> dict:
"""
Return the .sfm.json sidecar for a stored event. 404 if the event
is unknown or has no sidecar in the store (events ingested before
the sidecar feature landed will show this until backfilled).
"""
row = _get_db().get_event(event_id)
if row is None:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
serial = row.get("serial")
filename = row.get("blastware_filename")
if not serial or not filename:
raise HTTPException(
status_code=404,
detail=f"Event {event_id} has no event file in the store",
)
sidecar = _get_store().load_sidecar(serial, filename)
if sidecar is None:
raise HTTPException(
status_code=404,
detail=(
f"No .sfm.json sidecar on disk for {filename}. "
"Run scripts/backfill_sidecars.py to generate one."
),
)
return sidecar
@app.patch("/db/events/{event_id}/sidecar")
def db_event_sidecar_patch(event_id: str, body: SidecarPatchBody) -> dict:
"""
JSON-merge-patch the sidecar's `review` and/or `extensions` blocks.
The sidecar JSON is the source of truth for review state. When
`review.false_trigger` is updated, the SQL `events.false_trigger`
column is kept in sync as a derived index for fast filtering.
Returns the new full sidecar. 404 if the event or sidecar is missing.
"""
row = _get_db().get_event(event_id)
if row is None:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
serial = row.get("serial")
filename = row.get("blastware_filename")
if not serial or not filename:
raise HTTPException(
status_code=404,
detail=f"Event {event_id} has no event file in the store",
)
if not (body.review or body.extensions):
raise HTTPException(
status_code=400,
detail="PATCH body must include `review` and/or `extensions`",
)
new_sidecar = _get_store().patch_sidecar(
serial, filename,
review=body.review,
extensions=body.extensions,
)
if new_sidecar is None:
raise HTTPException(
status_code=404,
detail=f"No .sfm.json sidecar on disk for {filename}",
)
# Mirror false_trigger from review block into the SQL index column.
if body.review is not None:
_get_db().update_event_review(event_id, new_sidecar.get("review", {}))
return new_sidecar
# ── /db/import/blastware_file — ingest BW-only event files ────────────────────
@app.post("/db/import/blastware_file")
async def db_import_blastware_file(
files: list[UploadFile] = File(...),
serial: Optional[str] = Query(None, description="Optional serial-number hint (e.g. BE11529); falls back to the BW filename's encoded prefix when omitted"),
) -> dict:
"""
Multipart upload of one or more Blastware event file binaries
(typically produced by Blastware's own ACH). For each file:
1. Parse the bytes via WaveformStore.save_imported_bw — produces
a parsed Event + copies the file into the persistent store +
writes a .sfm.json sidecar with source.kind = "bw-import".
2. Upsert a row into `events` (dedup'd on serial+timestamp).
Response includes per-file outcomes so the caller can see which
landed cleanly and which failed (e.g. malformed file, unknown
serial, etc.).
"""
store = _get_store()
db = _get_db()
results: list[dict] = []
for upload in files:
try:
content = await upload.read()
except Exception as exc:
results.append({
"filename": upload.filename, "status": "error",
"detail": f"read failed: {exc}",
})
continue
try:
ev, rec = store.save_imported_bw(
content,
source_path=Path(upload.filename or "imported.bw"),
serial_hint=serial,
)
inserted, skipped = db.insert_events(
[ev],
serial=(serial or _serial_from_event(ev) or "UNKNOWN"),
waveform_records={
ev._waveform_key.hex(): rec
if ev._waveform_key else None
} if ev._waveform_key else None,
)
results.append({
"filename": upload.filename,
"status": "ok",
"stored_filename": rec["filename"],
"filesize": rec["filesize"],
"sha256": rec["sha256"],
"inserted": inserted,
"skipped": skipped,
})
except Exception as exc:
log.error("import failed for %s: %s", upload.filename, exc, exc_info=True)
results.append({
"filename": upload.filename, "status": "error",
"detail": str(exc),
})
return {"count": len(results), "results": results}
def _serial_from_event(ev) -> Optional[str]:
"""Fallback serial resolver — currently relies on the BW filename
decoder via WaveformStore.save_imported_bw, so this is just a
placeholder for future enhancement (e.g. inferring from project_info)."""
return None
@app.get("/db/units/{serial}/waveforms.zip")
def db_unit_waveforms_zip(
serial: str,
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"),
) -> StreamingResponse:
"""
Stream a ZIP of all event files for a serial in the optional date range.
Events without a stored event file are silently skipped.
"""
import io
import zipfile
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_events(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
limit=limit,
offset=0,
)
store = _get_store()
buf = io.BytesIO()
written = 0
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for row in rows:
fn = row.get("blastware_filename")
if not fn:
continue
bw_path = store.open_blastware(serial, fn)
if bw_path is None:
continue
zf.write(bw_path, arcname=fn)
written += 1
if written == 0:
raise HTTPException(
status_code=404,
detail=f"No stored Blastware files found for serial {serial} in range",
)
buf.seek(0)
safe_serial = serial.replace("/", "_")
headers = {
"Content-Disposition": f'attachment; filename="{safe_serial}_waveforms.zip"',
"X-Waveform-Count": str(written),
}
return StreamingResponse(buf, media_type="application/zip", headers=headers)
@app.get("/db/monitor_log")
def db_monitor_log(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
limit: int = Query(500, description="Max rows to return"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query monitor log entries (continuous monitoring intervals) from the DB.
Returns entries newest-first.
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_monitor_log(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
limit=limit,
offset=offset,
)
return {"count": len(rows), "entries": rows}
@app.get("/db/sessions")
def db_sessions(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
limit: int = Query(50, description="Max rows to return"),
) -> dict:
"""
Query ACH call-home sessions from the DB, newest first.
"""
rows = _get_db().get_sessions(serial=serial, limit=limit)
return {"count": len(rows), "sessions": rows}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server")
ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)")
ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)")
args = ap.parse_args()
log.info("Starting SFM server on %s:%d", args.host, args.port)
uvicorn.run(
"sfm.server:app",
host=args.host,
port=args.port,
reload=args.reload,
)