""" sfm/server.py — Seismograph Field Module REST API Wraps the minimateplus library in a small FastAPI service. Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100). Default port: 8200 Endpoints --------- GET /health Service heartbeat — no device I/O GET /device/info POLL + serial number + full config read GET /device/events Download all stored events (headers + peak values) POST /device/connect Explicit connect/identify (same as /device/info) GET /device/event/{idx} Single event by index (header + waveform record) All device endpoints accept query params: port — serial port (e.g. COM5, /dev/ttyUSB0) baud — baud rate (default 38400) Each call opens the serial port, does its work, then closes it. (Stateless / reconnect-per-call, matching Blastware's observed behaviour.) Run with: python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload or: python sfm/server.py """ from __future__ import annotations import logging import sys from typing import Optional # FastAPI / Pydantic try: from fastapi import FastAPI, HTTPException, Query from fastapi.responses import JSONResponse import uvicorn except ImportError: print( "fastapi and uvicorn are required for the SFM server.\n" "Install them with: pip install fastapi uvicorn", file=sys.stderr, ) sys.exit(1) from minimateplus import MiniMateClient from minimateplus.protocol import ProtocolError from minimateplus.models import DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("sfm.server") # ── FastAPI app ──────────────────────────────────────────────────────────────── app = FastAPI( title="Seismograph Field Module (SFM)", description=( "REST API for Instantel MiniMate Plus seismographs.\n" "Implements the minimateplus RS-232 protocol library.\n" "Proxied by terra-view at /api/sfm/*." ), version="0.1.0", ) # ── Serialisers ──────────────────────────────────────────────────────────────── # Plain dict helpers — avoids a Pydantic dependency in the library layer. def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]: if ts is None: return None return { "year": ts.year, "month": ts.month, "day": ts.day, "clock_set": ts.clock_set, "display": str(ts), } def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]: if pv is None: return None return { "tran_in_s": pv.tran, "vert_in_s": pv.vert, "long_in_s": pv.long, "micl_psi": pv.micl, } def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]: if pi is None: return None return { "setup_name": pi.setup_name, "project": pi.project, "client": pi.client, "operator": pi.operator, "sensor_location": pi.sensor_location, "notes": pi.notes, } def _serialise_device_info(info: DeviceInfo) -> dict: return { "serial": info.serial, "firmware_version": info.firmware_version, "firmware_minor": info.firmware_minor, "dsp_version": info.dsp_version, "manufacturer": info.manufacturer, "model": info.model, } def _serialise_event(ev: Event) -> dict: return { "index": ev.index, "timestamp": _serialise_timestamp(ev.timestamp), "sample_rate": ev.sample_rate, "record_type": ev.record_type, "peak_values": _serialise_peak_values(ev.peak_values), "project_info": _serialise_project_info(ev.project_info), } # ── Common dependency ────────────────────────────────────────────────────────── def _get_port(port: Optional[str]) -> str: if not port: raise HTTPException( status_code=422, detail="Query parameter 'port' is required (e.g. ?port=COM5)", ) return port # ── Endpoints ────────────────────────────────────────────────────────────────── @app.get("/health") def health() -> dict: """Service heartbeat. No device I/O.""" return {"status": "ok", "service": "sfm", "version": "0.1.0"} @app.get("/device/info") def device_info( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Baud rate"), ) -> dict: """ Connect to the device, perform the POLL startup handshake, and return identity information (serial number, firmware version, model). Equivalent to POST /device/connect — provided as GET for convenience. """ port_str = _get_port(port) log.info("GET /device/info port=%s baud=%d", port_str, baud) try: with MiniMateClient(port_str, baud) as client: info = client.connect() except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc return _serialise_device_info(info) @app.post("/device/connect") def device_connect( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Baud rate"), ) -> dict: """ Connect to the device and return identity. POST variant for terra-view compatibility with the SLMM proxy pattern. """ return device_info(port=port, baud=baud) @app.get("/device/events") def device_events( port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Baud rate"), ) -> dict: """ Connect to the device, read the event index, and download all stored events (event headers + full waveform records with peak values). This does NOT download raw ADC waveform samples — those are large and fetched separately via GET /device/event/{idx}/waveform (future endpoint). """ port_str = _get_port(port) log.info("GET /device/events port=%s baud=%d", port_str, baud) try: with MiniMateClient(port_str, baud) as client: info = client.connect() events = client.get_events() except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc return { "device": _serialise_device_info(info), "event_count": len(events), "events": [_serialise_event(ev) for ev in events], } @app.get("/device/event/{index}") def device_event( index: int, port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), baud: int = Query(38400, description="Baud rate"), ) -> dict: """ Download a single event by index (0-based). Performs: POLL startup → event index → event header → waveform record. """ port_str = _get_port(port) log.info("GET /device/event/%d port=%s baud=%d", index, port_str, baud) try: with MiniMateClient(port_str, baud) as client: client.connect() events = client.get_events() except ProtocolError as exc: raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc matching = [ev for ev in events if ev.index == index] if not matching: raise HTTPException( status_code=404, detail=f"Event index {index} not found on device", ) return _serialise_event(matching[0]) # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server") ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)") ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)") ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)") args = ap.parse_args() log.info("Starting SFM server on %s:%d", args.host, args.port) uvicorn.run( "sfm.server:app", host=args.host, port=args.port, reload=args.reload, )