Files
seismo-relay/sfm/server.py
T
claude 03d224ccc3 v0.11.0 — SQLite persistence layer (SeismoDb)
sfm/database.py (new)
- SeismoDb class: three tables keyed by unit serial number
  - ach_sessions: one row per ACH call-home
  - events: one row per triggered event, deduped by (serial, waveform_key)
  - monitor_log: one row per monitoring interval, deduped by (serial, waveform_key)
- WAL mode, per-request connections, silent dedup via UNIQUE constraint
- Query helpers: query_events(), query_monitor_log(), get_sessions(), query_units()
- false_trigger flag on events for future review UI / report filtering

bridges/ach_server.py
- Import SeismoDb; create shared instance at startup pointed at
  bridges/captures/seismo_relay.db
- After each call-home: insert_events() + insert_monitor_log() + insert_ach_session()
- DB failures logged as warnings, never abort the session

sfm/server.py
- Import SeismoDb; lazy singleton via _get_db()
- New DB read endpoints: GET /db/units, /db/events, /db/monitor_log, /db/sessions
- PATCH /db/events/{id}/false_trigger for manual review flagging

CLAUDE.md / CHANGELOG.md
- Document DB schema, SFM DB endpoints, architecture decision (unit-keyed only)
- Version bump to v0.11.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 00:45:38 -04:00

838 lines
33 KiB
Python

"""
sfm/server.py — Seismograph Field Module REST API
Wraps the minimateplus library in a small FastAPI service.
Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100).
Default port: 8200
Endpoints
---------
GET /health Service heartbeat — no device I/O
GET /device/info POLL + serial number + full config read
GET /device/events Download all stored events (headers + peak values)
POST /device/connect Explicit connect/identify (same as /device/info)
GET /device/event/{idx} Single event by index (header + waveform record)
Transport query params (supply one set):
Serial (direct RS-232 cable):
port — serial port name (e.g. COM5, /dev/ttyUSB0)
baud — baud rate (default 38400)
TCP (modem / ACH Auto Call Home):
host — IP address or hostname of the modem or ACH relay
tcp_port — TCP port number (default 12345, Blastware default)
Each call opens the connection, does its work, then closes it.
(Stateless / reconnect-per-call, matching Blastware's observed behaviour.)
Run with:
python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload
or:
python sfm/server.py
"""
from __future__ import annotations
import datetime
import logging
import sys
from pathlib import Path
from typing import Optional
# FastAPI / Pydantic
try:
from fastapi import Body, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
import uvicorn
except ImportError:
print(
"fastapi and uvicorn are required for the SFM server.\n"
"Install them with: pip install fastapi uvicorn",
file=sys.stderr,
)
sys.exit(1)
from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from sfm.database import SeismoDb
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("sfm.server")
# ── FastAPI app ────────────────────────────────────────────────────────────────
app = FastAPI(
title="Seismograph Field Module (SFM)",
description=(
"REST API for Instantel MiniMate Plus seismographs.\n"
"Implements the minimateplus RS-232 protocol library.\n"
"Proxied by terra-view at /api/sfm/*."
),
version="0.1.0",
)
# Allow requests from the waveform viewer opened as a local file (file://)
# and from any dev server or terra-view proxy.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ── DB ────────────────────────────────────────────────────────────────────────
# Shared SeismoDb instance. Path can be overridden by --db-path at startup,
# or defaults to bridges/captures/seismo_relay.db relative to the repo root.
_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db"
_db: Optional[SeismoDb] = None
def _get_db() -> SeismoDb:
global _db
if _db is None:
_db = SeismoDb(_DEFAULT_DB_PATH)
return _db
# ── Serialisers ────────────────────────────────────────────────────────────────
# Plain dict helpers — avoids a Pydantic dependency in the library layer.
def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]:
if ts is None:
return None
return {
"year": ts.year,
"month": ts.month,
"day": ts.day,
"hour": ts.hour,
"minute": ts.minute,
"second": ts.second,
"clock_set": ts.clock_set,
"display": str(ts),
}
def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]:
if pv is None:
return None
return {
"tran_in_s": pv.tran,
"vert_in_s": pv.vert,
"long_in_s": pv.long,
"micl_psi": pv.micl,
"peak_vector_sum": pv.peak_vector_sum,
}
def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]:
if pi is None:
return None
return {
"setup_name": pi.setup_name,
"project": pi.project,
"client": pi.client,
"operator": pi.operator,
"sensor_location": pi.sensor_location,
"notes": pi.notes,
}
def _serialise_compliance_config(cc: Optional["ComplianceConfig"]) -> Optional[dict]:
if cc is None:
return None
return {
"record_time": cc.record_time,
"sample_rate": cc.sample_rate,
"trigger_level_geo": cc.trigger_level_geo,
"alarm_level_geo": cc.alarm_level_geo,
"max_range_geo": cc.max_range_geo,
"setup_name": cc.setup_name,
"project": cc.project,
"client": cc.client,
"operator": cc.operator,
"sensor_location": cc.sensor_location,
"notes": cc.notes,
}
def _serialise_device_info(info: DeviceInfo) -> dict:
return {
"serial": info.serial,
"firmware_version": info.firmware_version,
"firmware_minor": info.firmware_minor,
"dsp_version": info.dsp_version,
"manufacturer": info.manufacturer,
"model": info.model,
"event_count_sub08": info.event_count, # unreliable — SUB 08 always returns 1
"compliance_config": _serialise_compliance_config(info.compliance_config),
}
def _serialise_event(ev: Event, debug: bool = False) -> dict:
d: dict = {
"index": ev.index,
"timestamp": _serialise_timestamp(ev.timestamp),
"sample_rate": ev.sample_rate,
"record_type": ev.record_type,
"peak_values": _serialise_peak_values(ev.peak_values),
"project_info": _serialise_project_info(ev.project_info),
}
if debug:
raw = getattr(ev, "_raw_record", None)
d["raw_record_hex"] = raw.hex() if raw else None
d["raw_record_len"] = len(raw) if raw else 0
return d
# ── Transport factory ─────────────────────────────────────────────────────────
def _build_client(
port: Optional[str],
baud: int,
host: Optional[str],
tcp_port: int,
timeout: float = 30.0,
) -> MiniMateClient:
"""
Return a MiniMateClient configured for either serial or TCP transport.
TCP takes priority if *host* is supplied; otherwise *port* (serial) is used.
Raises HTTPException(422) if neither is provided.
Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform
download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over
cellular and each individual recv must complete within the timeout window.
"""
if host:
transport = TcpTransport(host, port=tcp_port)
log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout)
return MiniMateClient(transport=transport, timeout=timeout)
elif port:
log.debug("Serial transport: %s baud=%d", port, baud)
return MiniMateClient(port, baud)
else:
raise HTTPException(
status_code=422,
detail=(
"Specify either 'port' (serial, e.g. ?port=COM5) "
"or 'host' (TCP, e.g. ?host=192.168.1.50&tcp_port=12345)"
),
)
def _is_tcp(host: Optional[str]) -> bool:
return bool(host)
def _run_with_retry(fn, *, is_tcp: bool):
"""
Call fn() and, for TCP connections only, retry once on ProtocolError.
Rationale: when a MiniMate Plus is cold (just had its serial lines asserted
by the modem or a local bridge), it takes 5-10 seconds to boot before it
will respond to POLL_PROBE. The first request may time out during that boot
window; a single automatic retry is enough to recover once the unit is up.
Serial connections are NOT retried — a timeout there usually means a real
problem (wrong port, wrong baud, cable unplugged).
"""
try:
return fn()
except ProtocolError as exc:
if not is_tcp:
raise
log.info("TCP poll timed out (unit may have been cold) — retrying once")
return fn() # let any second failure propagate normally
# ── Endpoints ──────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict:
"""Service heartbeat. No device I/O."""
return {"status": "ok", "service": "sfm", "version": "0.1.0"}
@app.get("/", response_class=FileResponse)
def webapp():
"""Serve the SFM web app."""
return str(Path(__file__).parent / "sfm_webapp.html")
@app.get("/device/info")
def device_info(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"),
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Connect to the device, perform the POLL startup handshake, and return
identity information (serial number, firmware version, model).
Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect — provided as GET for convenience.
"""
log.info("GET /device/info port=%s host=%s tcp_port=%d", port, host, tcp_port)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
# SUB 08 event_count is unreliable (always returns 1 regardless of
# actual storage). Count via 1E/1F chain instead.
info.event_count = client.count_events()
return info
info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return _serialise_device_info(info)
@app.post("/device/connect")
def device_connect(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Connect to the device and return identity. POST variant for terra-view
compatibility with the SLMM proxy pattern.
"""
return device_info(port=port, baud=baud, host=host, tcp_port=tcp_port)
@app.get("/device/events")
def device_events(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
) -> dict:
"""
Connect to the device, read the event index, and download all stored
events (event headers + full waveform records with peak values).
Supply either *port* (serial) or *host* (TCP/modem).
Pass debug=true to include raw_record_hex in each event — useful for
verifying field offsets against the protocol reference.
This does NOT download raw ADC waveform samples — those are large and
fetched separately via GET /device/event/{idx}/waveform (future endpoint).
"""
log.info("GET /device/events port=%s host=%s debug=%s", port, host, debug)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
return client.connect(), client.get_events(debug=debug)
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Fill sample_rate from compliance config where the event record doesn't supply it.
# sample_rate is a device-level setting, not stored per-event in the waveform record.
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
# Backfill event.project_info fields that the 210-byte waveform record doesn't carry.
# The waveform record only stores "Project:" — client/operator/sensor_location/notes
# live in the SUB 1A compliance config, not in the per-event record.
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
return {
"device": _serialise_device_info(info),
"event_count": len(events),
"events": [_serialise_event(ev, debug=debug) for ev in events],
}
@app.get("/device/event/{index}")
def device_event(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Download a single event by index (0-based).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → event index → event header → waveform record.
"""
log.info("GET /device/event/%d port=%s host=%s", index, port, host)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.get_events(stop_after_index=index)
events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
matching = [ev for ev in events if ev.index == index]
if not matching:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
return _serialise_event(matching[0])
@app.get("/device/event/{index}/waveform")
def device_event_waveform(
index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Download the full raw ADC waveform for a single event (0-based index).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → get_events() (to locate the 4-byte waveform key) →
download_waveform() (full SUB 5A stream, stop_after_metadata=False).
Response includes:
- **total_samples**: expected sample-sets from the STRT record
- **pretrig_samples**: pre-trigger sample count
- **rectime_seconds**: record duration
- **samples_decoded**: actual sample-sets decoded (may be less than total_samples
if the device is not storing all frames yet, or the capture was partial)
- **sample_rate**: samples per second (from compliance config)
- **channels**: dict of channel name → list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic")
"""
log.info("GET /device/event/%d/waveform port=%s host=%s", index, port, host)
try:
def _do():
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
info = client.connect()
# stop_after_index avoids downloading events beyond the one requested.
events = client.get_events(full_waveform=True, stop_after_index=index)
matching = [ev for ev in events if ev.index == index]
return matching[0] if matching else None, info
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
if ev is None:
raise HTTPException(
status_code=404,
detail=f"Event index {index} not found on device",
)
raw = getattr(ev, "raw_samples", None) or {}
samples_decoded = len(raw.get("Tran", []))
# Resolve sample_rate from compliance config if not on the event itself
sample_rate = ev.sample_rate
if sample_rate is None and info.compliance_config:
sample_rate = info.compliance_config.sample_rate
return {
"index": ev.index,
"record_type": ev.record_type,
"timestamp": _serialise_timestamp(ev.timestamp),
"total_samples": ev.total_samples,
"pretrig_samples": ev.pretrig_samples,
"rectime_seconds": ev.rectime_seconds,
"samples_decoded": samples_decoded,
"sample_rate": sample_rate,
"peak_values": _serialise_peak_values(ev.peak_values),
"channels": raw,
}
# ── Write endpoints ───────────────────────────────────────────────────────────
class DeviceConfigBody(BaseModel):
"""
Request body for POST /device/config.
All fields are optional — only supplied (non-null) fields are written to
the device. All other config bytes are round-tripped verbatim.
Recording parameters
--------------------
sample_rate : Samples per second. Valid values: 1024, 2048, 4096.
record_time : Record duration in seconds (e.g. 1.0, 2.0, 3.0).
Trigger / alarm thresholds (geo channels, in/s)
------------------------------------------------
trigger_level_geo : Trigger threshold in in/s (e.g. 0.5).
alarm_level_geo : Alarm threshold in in/s (e.g. 1.0).
max_range_geo : Full-scale calibration constant (e.g. 6.206).
Rarely changed — only set if you know what you're doing.
Project / operator strings (max 41 ASCII characters each)
----------------------------
project : Project description.
client_name : Client / company name.
operator : Operator / technician name.
seis_loc : Sensor location description.
notes : Extended notes.
"""
# Recording parameters
sample_rate: Optional[int] = None
record_time: Optional[float] = None
# Threshold parameters
trigger_level_geo: Optional[float] = None
alarm_level_geo: Optional[float] = None
max_range_geo: Optional[float] = None
# Project / operator strings
project: Optional[str] = None
client_name: Optional[str] = None
operator: Optional[str] = None
seis_loc: Optional[str] = None
notes: Optional[str] = None
@app.post("/device/config")
def device_config(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the current device config, apply any supplied changes to the compliance
block, and write the full config back.
Only non-null fields in the JSON body are modified. All other config bytes
are round-tripped verbatim from the device.
Supply either *port* (serial) or *host* (TCP/modem).
Example body (all fields optional — include only what you want to change):
{
"sample_rate": 1024,
"record_time": 3.0,
"trigger_level_geo": 0.5,
"alarm_level_geo": 1.0,
"project": "Bridge Inspection 2026",
"client_name": "City of Portland",
"operator": "Brian Harrison",
"seis_loc": "South Abutment",
"notes": "Pre-blast baseline"
}
Returns:
{"status": "ok", "updated_fields": {...}} on success.
Raises:
502 on protocol errors (timeout, bad ack, etc.).
422 if neither port nor host is provided.
"""
changed = body.model_dump(exclude_none=True)
log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys()))
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
client.apply_config(
sample_rate=body.sample_rate,
record_time=body.record_time,
trigger_level_geo=body.trigger_level_geo,
alarm_level_geo=body.alarm_level_geo,
max_range_geo=body.max_range_geo,
project=body.project,
client_name=body.client_name,
operator=body.operator,
seis_loc=body.seis_loc,
notes=body.notes,
)
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return {
"status": "ok",
"updated_fields": changed,
}
# Keep the old endpoint alive under its old URL for anything already calling it
@app.post("/device/config/project")
def device_config_project(
body: DeviceConfigBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""Deprecated alias for POST /device/config — use that instead."""
return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port)
# ── Monitoring endpoints ───────────────────────────────────────────────────────
@app.get("/device/monitor/status")
def device_monitor_status(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read monitoring status from the device.
Uses poll() (POLL handshake only — no config/compliance reads) so the
request completes in ~2 seconds instead of ~15. The full connect() was
causing false "idle" readings because the compliance+event sequence was
interacting with the device state before the 0x1C read.
Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle.
"""
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("monitor status poll retry: %s", exc)
client.poll()
status = client.get_monitor_status()
result: dict = {"is_monitoring": status.is_monitoring}
if status.battery_v is not None:
result["battery_v"] = round(status.battery_v, 2)
if status.memory_total is not None:
result["memory_total_bytes"] = status.memory_total
result["memory_total_kb"] = round(status.memory_total / 1024, 1)
if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
return result
@app.post("/device/monitor/start")
def device_monitor_start(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to start monitoring (recording triggered events).
Sends SUB 0x96 and waits for ack SUB 0x69.
"""
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("start monitoring poll retry: %s", exc)
client.poll()
client.start_monitoring()
return {"status": "started"}
@app.post("/device/monitor/stop")
def device_monitor_stop(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to stop monitoring.
Sends SUB 0x97 and waits for ack SUB 0x68.
"""
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("stop monitoring poll retry: %s", exc)
client.poll()
client.stop_monitoring()
return {"status": "stopped"}
# ── DB read endpoints ─────────────────────────────────────────────────────────
#
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.
# All queries are read-only. Terra-view calls these to build project event
# views, unit history panels, and (eventually) vibration summary reports.
@app.get("/db/units")
def db_units() -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries, total_sessions.
"""
return _get_db().query_units()
@app.get("/db/events")
def db_events(
serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"),
limit: int = Query(500, description="Max rows to return (default 500)"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query triggered events from the DB.
Returns events newest-first. All filter params are optional.
Example:
GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_events(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=false_trigger,
limit=limit,
offset=offset,
)
return {"count": len(rows), "events": rows}
@app.patch("/db/events/{event_id}/false_trigger")
def db_set_false_trigger(
event_id: str,
value: bool = Query(..., description="True to flag as false trigger, False to clear"),
) -> dict:
"""
Set or clear the false_trigger flag on a single event.
Used by the terra-view event review UI.
Returns 404 if the event_id is not found.
"""
found = _get_db().set_false_trigger(event_id, value)
if not found:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
return {"status": "ok", "event_id": event_id, "false_trigger": value}
@app.get("/db/monitor_log")
def db_monitor_log(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
limit: int = Query(500, description="Max rows to return"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query monitor log entries (continuous monitoring intervals) from the DB.
Returns entries newest-first.
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_monitor_log(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
limit=limit,
offset=offset,
)
return {"count": len(rows), "entries": rows}
@app.get("/db/sessions")
def db_sessions(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
limit: int = Query(50, description="Max rows to return"),
) -> dict:
"""
Query ACH call-home sessions from the DB, newest first.
"""
rows = _get_db().get_sessions(serial=serial, limit=limit)
return {"count": len(rows), "sessions": rows}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server")
ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)")
ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)")
args = ap.parse_args()
log.info("Starting SFM server on %s:%d", args.host, args.port)
uvicorn.run(
"sfm.server:app",
host=args.host,
port=args.port,
reload=args.reload,
)