Files
seismo-relay/sfm/server.py
T
claude 0da88ec6aa fix: redefines rectime_seconds from strt[18] byte to new computed time.
The server now re-computes rectime_seconds using the actual sample rate from the compliance config (overriding the default 1024 in the client), so if the device runs at 2048 or 4096 sps it's still correct.

Viewer — The rectime display now shows Xs (stored) / Ys (cfg) so you can compare the STRT-derived duration against the compliance config's record_time setting side-by-side. I also clamped the y-axis to ±(0C peak × 1.4) so near-saturation decode artifacts don't squash the real blast signal into a flat line.
2026-04-14 14:19:17 -04:00

1109 lines
46 KiB
Python

"""
sfm/server.py — Seismograph Field Module REST API
Wraps the minimateplus library in a small FastAPI service.
Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100).
Default port: 8200
Endpoints
---------
GET /health Service heartbeat — no device I/O
GET /device/info POLL + serial number + full config read
GET /device/events Download all stored events (headers + peak values)
POST /device/connect Explicit connect/identify (same as /device/info)
GET /device/event/{idx} Single event by index (header + waveform record)
Transport query params (supply one set):
Serial (direct RS-232 cable):
port — serial port name (e.g. COM5, /dev/ttyUSB0)
baud — baud rate (default 38400)
TCP (modem / ACH Auto Call Home):
host — IP address or hostname of the modem or ACH relay
tcp_port — TCP port number (default 12345, Blastware default)
Each call opens the connection, does its work, then closes it.
(Stateless / reconnect-per-call, matching Blastware's observed behaviour.)
Run with:
python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload
or:
python sfm/server.py
"""
from __future__ import annotations
import datetime
import logging
import sys
import threading
import time
from pathlib import Path
from typing import Optional
# FastAPI / Pydantic
try:
from fastapi import Body, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
import uvicorn
except ImportError:
print(
"fastapi and uvicorn are required for the SFM server.\n"
"Install them with: pip install fastapi uvicorn",
file=sys.stderr,
)
sys.exit(1)
from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from sfm.database import SeismoDb
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("sfm.server")
# ── FastAPI app ────────────────────────────────────────────────────────────────
app = FastAPI(
title="Seismograph Field Module (SFM)",
description=(
"REST API for Instantel MiniMate Plus seismographs.\n"
"Implements the minimateplus RS-232 protocol library.\n"
"Proxied by terra-view at /api/sfm/*."
),
version="0.1.0",
)
# Allow requests from the waveform viewer opened as a local file (file://)
# and from any dev server or terra-view proxy.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ── DB ────────────────────────────────────────────────────────────────────────
# Shared SeismoDb instance. Path can be overridden by --db-path at startup,
# or defaults to bridges/captures/seismo_relay.db relative to the repo root.
_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db"
_db: Optional[SeismoDb] = None
def _get_db() -> SeismoDb:
global _db
if _db is None:
_db = SeismoDb(_DEFAULT_DB_PATH)
return _db
# ── Live device cache ─────────────────────────────────────────────────────────
# In-memory cache for live device data. Avoids re-dialing the device on every
# request when the data hasn't changed.
#
# Keyed by conn_key ("tcp:host:port" or "serial:port:baud").
# Does NOT persist across server restarts — this is purely an in-process cache
# to reduce TCP round-trips and cellular data usage.
#
# Invalidation rules:
# device_info — cached until POST /device/config marks it dirty
# events — cached by (conn_key, device_event_count); re-fetched when
# a quick count_events() probe shows new events on the device
# monitor_status — 30-second TTL (changes frequently during monitoring)
# waveforms — permanent (immutable once recorded; indexed by conn_key+idx)
#
# All endpoints accept ?force=true to bypass the cache and re-read from device.
_MONITOR_STATUS_TTL = 30.0 # seconds
class _LiveCache:
"""
Thread-safe in-memory cache for live SFM device data.
One singleton per server process.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
# conn_key → serialised device info dict
self._device_info: dict[str, dict] = {}
# conn_key → (device_event_count_when_cached, [event dicts])
self._events: dict[str, tuple[int, list]] = {}
# conn_key → (fetched_at_unix, status_dict)
self._monitor_status: dict[str, tuple[float, dict]] = {}
# conn_key → bool (True = re-read device on next /device/info)
self._config_dirty: dict[str, bool] = {}
# (conn_key, event_index) → waveform dict (permanent)
self._waveforms: dict[tuple, dict] = {}
# ── Connection key ────────────────────────────────────────────────────────
@staticmethod
def make_conn_key(
host: Optional[str],
tcp_port: int,
port: Optional[str],
baud: int,
) -> str:
if host:
return f"tcp:{host}:{tcp_port}"
return f"serial:{port}:{baud}"
# ── Device info ───────────────────────────────────────────────────────────
def get_device_info(self, conn_key: str) -> Optional[dict]:
with self._lock:
if self._config_dirty.get(conn_key):
return None
return self._device_info.get(conn_key)
def set_device_info(self, conn_key: str, info: dict) -> None:
with self._lock:
self._device_info[conn_key] = info
self._config_dirty[conn_key] = False
# ── Events ────────────────────────────────────────────────────────────────
def get_events(self, conn_key: str, device_count: int) -> Optional[list]:
"""
Return cached events if the device's current event count matches what
we had when we last fetched. Returns None (cache miss) otherwise.
"""
with self._lock:
if self._config_dirty.get(conn_key):
return None
entry = self._events.get(conn_key)
if entry is None:
return None
cached_count, events = entry
return events if cached_count == device_count else None
def set_events(self, conn_key: str, device_count: int, events: list) -> None:
with self._lock:
self._events[conn_key] = (device_count, events)
# ── Monitor status ────────────────────────────────────────────────────────
def get_monitor_status(self, conn_key: str) -> Optional[dict]:
with self._lock:
entry = self._monitor_status.get(conn_key)
if entry is None:
return None
fetched_at, status = entry
if time.time() - fetched_at > _MONITOR_STATUS_TTL:
return None
return status
def set_monitor_status(self, conn_key: str, status: dict) -> None:
with self._lock:
self._monitor_status[conn_key] = (time.time(), status)
def invalidate_monitor_status(self, conn_key: str) -> None:
with self._lock:
self._monitor_status.pop(conn_key, None)
# ── Config dirty flag ─────────────────────────────────────────────────────
def mark_config_dirty(self, conn_key: str) -> None:
"""
Called after a successful POST /device/config write.
Forces next /device/info and /device/events to re-read from the device.
"""
with self._lock:
self._config_dirty[conn_key] = True
self._events.pop(conn_key, None)
# ── Waveforms (permanent cache) ───────────────────────────────────────────
def get_waveform(self, conn_key: str, index: int) -> Optional[dict]:
with self._lock:
return self._waveforms.get((conn_key, index))
def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None:
with self._lock:
self._waveforms[(conn_key, index)] = waveform
_live_cache = _LiveCache()
# ── Serialisers ────────────────────────────────────────────────────────────────
# Plain dict helpers — avoids a Pydantic dependency in the library layer.
def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]:
if ts is None:
return None
return {
"year": ts.year,
"month": ts.month,
"day": ts.day,
"hour": ts.hour,
"minute": ts.minute,
"second": ts.second,
"clock_set": ts.clock_set,
"display": str(ts),
}
def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]:
if pv is None:
return None
return {
"tran_in_s": pv.tran,
"vert_in_s": pv.vert,
"long_in_s": pv.long,
"micl_psi": pv.micl,
"peak_vector_sum": pv.peak_vector_sum,
}
def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]:
if pi is None:
return None
return {
"setup_name": pi.setup_name,
"project": pi.project,
"client": pi.client,
"operator": pi.operator,
"sensor_location": pi.sensor_location,
"notes": pi.notes,
}
def _serialise_compliance_config(cc: Optional["ComplianceConfig"]) -> Optional[dict]:
if cc is None:
return None
return {
"record_time": cc.record_time,
"sample_rate": cc.sample_rate,
"trigger_level_geo": cc.trigger_level_geo,
"alarm_level_geo": cc.alarm_level_geo,
"max_range_geo": cc.max_range_geo,
"setup_name": cc.setup_name,
"project": cc.project,
"client": cc.client,
"operator": cc.operator,
"sensor_location": cc.sensor_location,
"notes": cc.notes,
}
def _serialise_device_info(info: DeviceInfo) -> dict:
return {
"serial": info.serial,
"firmware_version": info.firmware_version,
"firmware_minor": info.firmware_minor,
"dsp_version": info.dsp_version,
"manufacturer": info.manufacturer,
"model": info.model,
"event_count_sub08": info.event_count, # unreliable — SUB 08 always returns 1
"compliance_config": _serialise_compliance_config(info.compliance_config),
}
def _serialise_event(ev: Event, debug: bool = False) -> dict:
d: dict = {
"index": ev.index,
"timestamp": _serialise_timestamp(ev.timestamp),
"sample_rate": ev.sample_rate,
"record_type": ev.record_type,
"peak_values": _serialise_peak_values(ev.peak_values),
"project_info": _serialise_project_info(ev.project_info),
}
if debug:
raw = getattr(ev, "_raw_record", None)
d["raw_record_hex"] = raw.hex() if raw else None
d["raw_record_len"] = len(raw) if raw else 0
return d
# ── Transport factory ─────────────────────────────────────────────────────────
def _build_client(
port: Optional[str],
baud: int,
host: Optional[str],
tcp_port: int,
timeout: float = 30.0,
) -> MiniMateClient:
"""
Return a MiniMateClient configured for either serial or TCP transport.
TCP takes priority if *host* is supplied; otherwise *port* (serial) is used.
Raises HTTPException(422) if neither is provided.
Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform
download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over
cellular and each individual recv must complete within the timeout window.
"""
if host:
transport = TcpTransport(host, port=tcp_port)
log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout)
return MiniMateClient(transport=transport, timeout=timeout)
elif port:
log.debug("Serial transport: %s baud=%d", port, baud)
return MiniMateClient(port, baud)
else:
raise HTTPException(
status_code=422,
detail=(
"Specify either 'port' (serial, e.g. ?port=COM5) "
"or 'host' (TCP, e.g. ?host=192.168.1.50&tcp_port=12345)"
),
)
def _is_tcp(host: Optional[str]) -> bool:
return bool(host)
def _run_with_retry(fn, *, is_tcp: bool):
"""
Call fn() and, for TCP connections only, retry once on ProtocolError.
Rationale: when a MiniMate Plus is cold (just had its serial lines asserted
by the modem or a local bridge), it takes 5-10 seconds to boot before it
will respond to POLL_PROBE. The first request may time out during that boot
window; a single automatic retry is enough to recover once the unit is up.
Serial connections are NOT retried — a timeout there usually means a real
problem (wrong port, wrong baud, cable unplugged).
"""
try:
return fn()
except ProtocolError as exc:
if not is_tcp:
raise
log.info("TCP poll timed out (unit may have been cold) — retrying once")
return fn() # let any second failure propagate normally
# ── Endpoints ──────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict:
"""Service heartbeat. No device I/O."""
return {"status": "ok", "service": "sfm", "version": "0.1.0"}
@app.get("/", response_class=FileResponse)
def webapp():
"""Serve the SFM web app."""
return str(Path(__file__).parent / "sfm_webapp.html")
@app.get("/waveform", response_class=FileResponse)
def waveform_viewer():
"""Serve the standalone waveform viewer."""
return str(Path(__file__).parent / "waveform_viewer.html")
@app.get("/device/info")
def device_info(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"),
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Connect to the device, perform the POLL startup handshake, and return
identity information (serial number, firmware version, model).
Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect — provided as GET for convenience.
Response is cached until a POST /device/config write invalidates it.
Pass ?force=true to bypass the cache.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
if not force:
cached = _live_cache.get_device_info(conn_key)
if cached is not None:
log.debug("device_info cache hit for %s", conn_key)
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
# SUB 08 event_count is unreliable (always returns 1 regardless of
# actual storage). Count via 1E/1F chain instead.
info.event_count = client.count_events()
return info
info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
result = _serialise_device_info(info)
_live_cache.set_device_info(conn_key, result)
return result
@app.post("/device/connect")
def device_connect(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Connect to the device and return identity. POST variant for terra-view
compatibility with the SLMM proxy pattern.
"""
return device_info(port=port, baud=baud, host=host, tcp_port=tcp_port)
@app.get("/device/events")
def device_events(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Connect to the device, read the event index, and download all stored
events (event headers + full waveform records with peak values).
Supply either *port* (serial) or *host* (TCP/modem).
**Caching:** a quick count_events() probe (~2s) is performed first. If the
device's event count matches the cached count, the cached response is returned
immediately without a full download. Pass ?force=true to skip this and always
re-download.
Pass debug=true to include raw_record_hex in each event — useful for
verifying field offsets against the protocol reference.
This does NOT download raw ADC waveform samples — those are large and
fetched separately via GET /device/event/{idx}/waveform.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
# ── Cache fast path ───────────────────────────────────────────────────────
# Do a quick poll + count_events() probe (~2s over cellular) to check if the
# device has any new events. If the count matches the cache, return early.
if not force and not debug:
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
try:
client.poll()
except Exception:
client.poll()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
cached_events = _live_cache.get_events(conn_key, device_count)
if cached_events is not None:
log.info(" events cache hit (%d events, count=%d)", len(cached_events), device_count)
# Also serve cached device info if available
cached_info = _live_cache.get_device_info(conn_key)
return {
"device": cached_info or {},
"event_count": len(cached_events),
"events": cached_events,
"cached": True,
}
except Exception as exc:
log.warning(" count probe failed (%s) — falling through to full download", exc)
# ── Full download ─────────────────────────────────────────────────────────
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
return client.connect(), client.get_events(debug=debug)
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Fill sample_rate from compliance config where the event record doesn't supply it.
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
# Backfill event.project_info fields that the 210-byte waveform record doesn't carry.
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
serialised_info = _serialise_device_info(info)
serialised_events = [_serialise_event(ev, debug=debug) for ev in events]
# Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache)
if not debug:
_live_cache.set_device_info(conn_key, serialised_info)
_live_cache.set_events(conn_key, len(events), serialised_events)
return {
"device": serialised_info,
"event_count": len(events),
"events": serialised_events,
"cached": False,
}
@app.get("/device/event/{index}")
def device_event(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Download a single event by index (0-based).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → event index → event header → waveform record.
"""
log.info("GET /device/event/%d port=%s host=%s", index, port, host)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.get_events(stop_after_index=index)
events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
matching = [ev for ev in events if ev.index == index]
if not matching:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
return _serialise_event(matching[0])
@app.get("/device/event/{index}/waveform")
def device_event_waveform(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download waveform"),
) -> dict:
"""
Download the full raw ADC waveform for a single event (0-based index).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → get_events() (to locate the 4-byte waveform key) →
download_waveform() (full SUB 5A stream, stop_after_metadata=False).
Response includes:
- **total_samples**: expected sample-sets from the STRT record
- **pretrig_samples**: pre-trigger sample count
- **rectime_seconds**: record duration
- **samples_decoded**: actual sample-sets decoded (may be less than total_samples
if the device is not storing all frames yet, or the capture was partial)
- **sample_rate**: samples per second (from compliance config)
- **channels**: dict of channel name → list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic")
Waveforms are immutable once recorded and are cached permanently per
(connection, event index). Pass ?force=true to re-download.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
if not force:
cached_waveform = _live_cache.get_waveform(conn_key, index)
if cached_waveform is not None:
log.debug("waveform cache hit: %s event %d", conn_key, index)
return cached_waveform
try:
def _do():
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
info = client.connect()
# stop_after_index avoids downloading events beyond the one requested.
events = client.get_events(full_waveform=True, stop_after_index=index)
matching = [ev for ev in events if ev.index == index]
return matching[0] if matching else None, info
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
if ev is None:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
raw = getattr(ev, "raw_samples", None) or {}
samples_decoded = len(raw.get("Tran", []))
# Resolve sample_rate from compliance config if not on the event itself
sample_rate = ev.sample_rate
if sample_rate is None and info.compliance_config:
sample_rate = info.compliance_config.sample_rate
# Recompute rectime_seconds using the actual sample rate now that we have it.
# _decode_a5_waveform used 1024 sps as default; override if device says otherwise.
# strt[18] is a record-mode byte (0x46 / 0x0E), NOT rectime in seconds.
rectime_seconds = ev.rectime_seconds
if (ev.total_samples is not None and ev.pretrig_samples is not None
and sample_rate and sample_rate > 0):
post_trig = max(0, ev.total_samples - ev.pretrig_samples)
rectime_seconds = round(post_trig / sample_rate, 2)
result = {
"index": ev.index,
"record_type": ev.record_type,
"timestamp": _serialise_timestamp(ev.timestamp),
"total_samples": ev.total_samples,
"pretrig_samples": ev.pretrig_samples,
"rectime_seconds": rectime_seconds,
"samples_decoded": samples_decoded,
"sample_rate": sample_rate,
"peak_values": _serialise_peak_values(ev.peak_values),
"channels": raw,
}
_live_cache.set_waveform(conn_key, index, result)
return result
# ── Write endpoints ───────────────────────────────────────────────────────────
class DeviceConfigBody(BaseModel):
"""
Request body for POST /device/config.
All fields are optional — only supplied (non-null) fields are written to
the device. All other config bytes are round-tripped verbatim.
Recording parameters
--------------------
sample_rate : Samples per second. Valid values: 1024, 2048, 4096.
record_time : Record duration in seconds (e.g. 1.0, 2.0, 3.0).
Trigger / alarm thresholds (geo channels, in/s)
------------------------------------------------
trigger_level_geo : Trigger threshold in in/s (e.g. 0.5).
alarm_level_geo : Alarm threshold in in/s (e.g. 1.0).
max_range_geo : Full-scale calibration constant (e.g. 6.206).
Rarely changed — only set if you know what you're doing.
Project / operator strings (max 41 ASCII characters each)
----------------------------
project : Project description.
client_name : Client / company name.
operator : Operator / technician name.
seis_loc : Sensor location description.
notes : Extended notes.
"""
# Recording parameters
sample_rate: Optional[int] = None
record_time: Optional[float] = None
# Threshold parameters
trigger_level_geo: Optional[float] = None
alarm_level_geo: Optional[float] = None
max_range_geo: Optional[float] = None
# Project / operator strings
project: Optional[str] = None
client_name: Optional[str] = None
operator: Optional[str] = None
seis_loc: Optional[str] = None
notes: Optional[str] = None
@app.post("/device/config")
def device_config(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the current device config, apply any supplied changes to the compliance
block, and write the full config back.
Only non-null fields in the JSON body are modified. All other config bytes
are round-tripped verbatim from the device.
Supply either *port* (serial) or *host* (TCP/modem).
Example body (all fields optional — include only what you want to change):
{
"sample_rate": 1024,
"record_time": 3.0,
"trigger_level_geo": 0.5,
"alarm_level_geo": 1.0,
"project": "Bridge Inspection 2026",
"client_name": "City of Portland",
"operator": "Brian Harrison",
"seis_loc": "South Abutment",
"notes": "Pre-blast baseline"
}
Returns:
{"status": "ok", "updated_fields": {...}} on success.
Raises:
502 on protocol errors (timeout, bad ack, etc.).
422 if neither port nor host is provided.
"""
changed = body.model_dump(exclude_none=True)
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys()))
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
client.apply_config(
sample_rate=body.sample_rate,
record_time=body.record_time,
trigger_level_geo=body.trigger_level_geo,
alarm_level_geo=body.alarm_level_geo,
max_range_geo=body.max_range_geo,
project=body.project,
client_name=body.client_name,
operator=body.operator,
seis_loc=body.seis_loc,
notes=body.notes,
)
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Config was written — invalidate cached device info and events so the next
# /device/info or /device/events call re-reads fresh data from the device.
_live_cache.mark_config_dirty(conn_key)
return {
"status": "ok",
"updated_fields": changed,
}
# Keep the old endpoint alive under its old URL for anything already calling it
@app.post("/device/config/project")
def device_config_project(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""Deprecated alias for POST /device/config — use that instead."""
return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port)
# ── Monitoring endpoints ───────────────────────────────────────────────────────
@app.get("/device/monitor/status")
def device_monitor_status(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Read monitoring status from the device.
Uses poll() (POLL handshake only — no config/compliance reads) so the
request completes in ~2 seconds instead of ~15. The full connect() was
causing false "idle" readings because the compliance+event sequence was
interacting with the device state before the 0x1C read.
Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle.
**Caching:** response is cached for 30 seconds. Pass ?force=true to bypass.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = _live_cache.get_monitor_status(conn_key)
if cached is not None:
log.debug("monitor_status cache hit for %s", conn_key)
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("monitor status poll retry: %s", exc)
client.poll()
status = client.get_monitor_status()
result: dict = {"is_monitoring": status.is_monitoring}
if status.battery_v is not None:
result["battery_v"] = round(status.battery_v, 2)
if status.memory_total is not None:
result["memory_total_bytes"] = status.memory_total
result["memory_total_kb"] = round(status.memory_total / 1024, 1)
if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
_live_cache.set_monitor_status(conn_key, result)
return result
@app.post("/device/monitor/start")
def device_monitor_start(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to start monitoring (recording triggered events).
Sends SUB 0x96 and waits for ack SUB 0x69.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("start monitoring poll retry: %s", exc)
client.poll()
client.start_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
return {"status": "started"}
@app.post("/device/monitor/stop")
def device_monitor_stop(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to stop monitoring.
Sends SUB 0x97 and waits for ack SUB 0x68.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("stop monitoring poll retry: %s", exc)
client.poll()
client.stop_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
return {"status": "stopped"}
# ── DB read endpoints ─────────────────────────────────────────────────────────
#
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.
# All queries are read-only. Terra-view calls these to build project event
# views, unit history panels, and (eventually) vibration summary reports.
@app.get("/db/units")
def db_units() -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries, total_sessions.
"""
return _get_db().query_units()
@app.get("/db/events")
def db_events(
serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"),
limit: int = Query(500, description="Max rows to return (default 500)"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query triggered events from the DB.
Returns events newest-first. All filter params are optional.
Example:
GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_events(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=false_trigger,
limit=limit,
offset=offset,
)
return {"count": len(rows), "events": rows}
@app.patch("/db/events/{event_id}/false_trigger")
def db_set_false_trigger(
event_id: str,
value: bool = Query(..., description="True to flag as false trigger, False to clear"),
) -> dict:
"""
Set or clear the false_trigger flag on a single event.
Used by the terra-view event review UI.
Returns 404 if the event_id is not found.
"""
found = _get_db().set_false_trigger(event_id, value)
if not found:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
return {"status": "ok", "event_id": event_id, "false_trigger": value}
@app.get("/db/events/{event_id}/waveform")
def db_event_waveform(event_id: str) -> dict:
"""
Return the stored waveform blob for a DB event.
The response shape is identical to GET /device/event/{index}/waveform so the
waveform viewer can consume either source without modification:
- total_samples, pretrig_samples, rectime_seconds, samples_decoded
- sample_rate
- peak_values (tran, vert, long, micl_psi, peak_vector_sum)
- channels ({"Tran": [...], "Vert": [...], "Long": [...], "Mic": [...]})
Returns 404 if the event doesn't exist, 422 if the event exists but has no
stored waveform (downloaded before waveform storage was implemented).
"""
import json as _json
db = _get_db()
found, blob_str = db.get_event_waveform(event_id)
if not found:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
if blob_str is None:
raise HTTPException(
status_code=422,
detail=(
f"Event {event_id} has no stored waveform. "
"Waveform storage requires ACH server v0.11+. "
"Re-download the event from the device to backfill."
),
)
try:
return _json.loads(blob_str)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Waveform blob corrupt: {exc}") from exc
@app.get("/db/monitor_log")
def db_monitor_log(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
limit: int = Query(500, description="Max rows to return"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query monitor log entries (continuous monitoring intervals) from the DB.
Returns entries newest-first.
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_monitor_log(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
limit=limit,
offset=offset,
)
return {"count": len(rows), "entries": rows}
@app.get("/db/sessions")
def db_sessions(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
limit: int = Query(50, description="Max rows to return"),
) -> dict:
"""
Query ACH call-home sessions from the DB, newest first.
"""
rows = _get_db().get_sessions(serial=serial, limit=limit)
return {"count": len(rows), "sessions": rows}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server")
ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)")
ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)")
args = ap.parse_args()
log.info("Starting SFM server on %s:%d", args.host, args.port)
uvicorn.run(
"sfm.server:app",
host=args.host,
port=args.port,
reload=args.reload,
)