f83993ad1d
The series3-watcher v1.5.0 fix taught the WATCHER to look for BW
ACH's _ASCII.TXT report alongside each binary. But the SFM
SERVER's import endpoint only knew about the legacy <binary>.TXT
naming when building its TXT lookup table.
Effect: even though the watcher correctly shipped both files in
the multipart POST (and logged "+ <name>_ASCII.TXT attached"),
the server's reports dict was keyed on the wrong name, so
report_bytes resolved to None for every event. Without the
report, save_imported_bw fell back to broken-codec peak values
and no project info — exactly the same symptom as before the
watcher fix landed, just for a different reason.
Fix: when stripping the ".TXT" suffix, also recognise the
"_ASCII" trailer and reconstruct the binary's filename by
converting the last "_" back to ".". Register the report under
BOTH possible binary names so the subsequent lookup matches
whichever convention the operator's BW installation uses.
ACH convention (Blastware ACH):
binary T003L2G6.0E0H + report T003L2G6_0E0H_ASCII.TXT ✅
Manual export (operator clicks Save As Text in BW):
binary M529LK44.AB0 + report M529LK44.AB0.TXT ✅
Both for same event (e.g. ACH + operator manual save):
register under both names; binary lookup wins ✅
Smoke-tested against the four real fixture filenames in the
project archive. Full SFM suite still 62 pass.
For the user's situation: pull, restart, and the NEXT re-forward
pass (after deleting watcher state file again if needed) will
hit this code path, parse the report correctly, apply the
overlay onto the Event, and the upsert path will land
authoritative peak values + project info in the DB.
1867 lines
78 KiB
Python
1867 lines
78 KiB
Python
"""
|
||
sfm/server.py — Seismograph Field Module REST API
|
||
|
||
Wraps the minimateplus library in a small FastAPI service.
|
||
Terra-view proxies /api/sfm/* to this service (same pattern as SLMM at :8100).
|
||
|
||
Default port: 8200
|
||
|
||
Endpoints
|
||
---------
|
||
GET /health Service heartbeat — no device I/O
|
||
GET /device/info POLL + serial number + full config read
|
||
GET /device/events Download all stored events (headers + peak values)
|
||
POST /device/connect Explicit connect/identify (same as /device/info)
|
||
GET /device/event/{idx} Single event by index (header + waveform record)
|
||
|
||
Transport query params (supply one set):
|
||
Serial (direct RS-232 cable):
|
||
port — serial port name (e.g. COM5, /dev/ttyUSB0)
|
||
baud — baud rate (default 38400)
|
||
|
||
TCP (modem / ACH Auto Call Home):
|
||
host — IP address or hostname of the modem or ACH relay
|
||
tcp_port — TCP port number (default 12345, Blastware default)
|
||
|
||
Each call opens the connection, does its work, then closes it.
|
||
(Stateless / reconnect-per-call, matching Blastware's observed behaviour.)
|
||
|
||
Run with:
|
||
python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload
|
||
or:
|
||
python sfm/server.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import datetime
|
||
import logging
|
||
import sys
|
||
import tempfile
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
# FastAPI / Pydantic
|
||
try:
|
||
from fastapi import Body, FastAPI, File, HTTPException, Query, UploadFile
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
|
||
from pydantic import BaseModel
|
||
import uvicorn
|
||
except ImportError:
|
||
print(
|
||
"fastapi and uvicorn are required for the SFM server.\n"
|
||
"Install them with: pip install fastapi uvicorn",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(1)
|
||
|
||
from minimateplus import MiniMateClient
|
||
from minimateplus.protocol import ProtocolError
|
||
from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
|
||
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
|
||
from minimateplus.blastware_file import write_blastware_file, blastware_filename
|
||
from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform
|
||
from sfm import event_hdf5
|
||
from sfm.cache import SFMCache, get_cache
|
||
from sfm.database import SeismoDb
|
||
from sfm.live_cache import LiveCache as _LiveCache
|
||
from sfm.waveform_store import WaveformStore
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger("sfm.server")
|
||
|
||
# ── FastAPI app ────────────────────────────────────────────────────────────────
|
||
|
||
app = FastAPI(
|
||
title="Seismograph Field Module (SFM)",
|
||
description=(
|
||
"REST API for Instantel MiniMate Plus seismographs.\n"
|
||
"Implements the minimateplus RS-232 protocol library.\n"
|
||
"Proxied by terra-view at /api/sfm/*."
|
||
),
|
||
version="0.1.0",
|
||
)
|
||
|
||
# Allow requests from the waveform viewer opened as a local file (file://)
|
||
# and from any dev server or terra-view proxy.
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["GET", "POST"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
# ── DB ────────────────────────────────────────────────────────────────────────
|
||
# Shared SeismoDb instance. Path can be overridden by --db-path at startup,
|
||
# or defaults to bridges/captures/seismo_relay.db relative to the repo root.
|
||
|
||
_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db"
|
||
_db: Optional[SeismoDb] = None
|
||
_store: Optional[WaveformStore] = None
|
||
|
||
|
||
def _get_db() -> SeismoDb:
|
||
global _db
|
||
if _db is None:
|
||
_db = SeismoDb(_DEFAULT_DB_PATH)
|
||
return _db
|
||
|
||
|
||
def _get_store() -> WaveformStore:
|
||
"""
|
||
Persistent event-file + A5-sidecar store, rooted at <db_dir>/waveforms/.
|
||
Mirrors the layout used by bridges/ach_server.py so files saved by ACH
|
||
ingestion and by live SFM downloads share one canonical location.
|
||
"""
|
||
global _store
|
||
if _store is None:
|
||
_store = WaveformStore(_get_db().db_path.parent / "waveforms")
|
||
return _store
|
||
|
||
|
||
# ── Live device cache ─────────────────────────────────────────────────────────
|
||
# In-memory cache for live device data. Avoids re-dialing the device on every
|
||
# request when the data hasn't changed.
|
||
#
|
||
# Keyed by conn_key ("tcp:host:port" or "serial:port:baud").
|
||
# Does NOT persist across server restarts — this is purely an in-process cache
|
||
# to reduce TCP round-trips and cellular data usage.
|
||
#
|
||
# Invalidation rules:
|
||
# device_info — cached until POST /device/config marks it dirty
|
||
# events — cached by (conn_key, device_event_count); re-fetched when
|
||
# a quick count_events() probe shows new events on the device
|
||
# monitor_status — 30-second TTL (changes frequently during monitoring)
|
||
# waveforms — permanent (immutable once recorded; indexed by conn_key+idx)
|
||
#
|
||
# All endpoints accept ?force=true to bypass the cache and re-read from device.
|
||
|
||
_live_cache = _LiveCache()
|
||
|
||
|
||
# ── Serialisers ────────────────────────────────────────────────────────────────
|
||
# Plain dict helpers — avoids a Pydantic dependency in the library layer.
|
||
|
||
def _serialise_timestamp(ts: Optional[Timestamp]) -> Optional[dict]:
|
||
if ts is None:
|
||
return None
|
||
return {
|
||
"year": ts.year,
|
||
"month": ts.month,
|
||
"day": ts.day,
|
||
"hour": ts.hour,
|
||
"minute": ts.minute,
|
||
"second": ts.second,
|
||
"clock_set": ts.clock_set,
|
||
"display": str(ts),
|
||
}
|
||
|
||
|
||
def _serialise_peak_values(pv: Optional[PeakValues]) -> Optional[dict]:
|
||
if pv is None:
|
||
return None
|
||
return {
|
||
"tran_in_s": pv.tran,
|
||
"vert_in_s": pv.vert,
|
||
"long_in_s": pv.long,
|
||
"micl_psi": pv.micl,
|
||
"peak_vector_sum": pv.peak_vector_sum,
|
||
}
|
||
|
||
|
||
def _serialise_project_info(pi: Optional[ProjectInfo]) -> Optional[dict]:
|
||
if pi is None:
|
||
return None
|
||
return {
|
||
"setup_name": pi.setup_name,
|
||
"project": pi.project,
|
||
"client": pi.client,
|
||
"operator": pi.operator,
|
||
"sensor_location": pi.sensor_location,
|
||
"notes": pi.notes,
|
||
}
|
||
|
||
|
||
def _serialise_compliance_config(cc: Optional["ComplianceConfig"]) -> Optional[dict]:
|
||
if cc is None:
|
||
return None
|
||
return {
|
||
"recording_mode": cc.recording_mode, # 0x00=Single Shot, 0x01=Continuous, 0x03=Histogram, 0x04=Histogram+Continuous
|
||
"sample_rate": cc.sample_rate,
|
||
"histogram_interval_sec": cc.histogram_interval_sec, # seconds; None if not Histogram mode
|
||
"record_time": cc.record_time,
|
||
"trigger_level_geo": cc.trigger_level_geo,
|
||
"alarm_level_geo": cc.alarm_level_geo,
|
||
"geo_adc_scale": cc.geo_adc_scale, # hw scale factor (in/s)/V — informational only, do not write
|
||
"geo_range": cc.geo_range, # CONFIRMED 2026-04-20: 0x00=Normal 10in/s, 0x01=Sensitive 1.25in/s
|
||
"setup_name": cc.setup_name,
|
||
"project": cc.project,
|
||
"client": cc.client,
|
||
"operator": cc.operator,
|
||
"sensor_location": cc.sensor_location,
|
||
"notes": cc.notes,
|
||
}
|
||
|
||
|
||
def _serialise_call_home_config(ch: Optional["CallHomeConfig"]) -> Optional[dict]:
|
||
if ch is None:
|
||
return None
|
||
return {
|
||
"auto_call_home_enabled": ch.auto_call_home_enabled,
|
||
"dial_string": ch.dial_string,
|
||
"after_event_recorded": ch.after_event_recorded,
|
||
"at_specified_times": ch.at_specified_times,
|
||
"time1_enabled": ch.time1_enabled,
|
||
"time1_hour": ch.time1_hour,
|
||
"time1_min": ch.time1_min,
|
||
"time2_enabled": ch.time2_enabled,
|
||
"time2_hour": ch.time2_hour,
|
||
"time2_min": ch.time2_min,
|
||
"num_retries": ch.num_retries,
|
||
"time_between_retries_sec": ch.time_between_retries_sec,
|
||
"wait_for_connection_sec": ch.wait_for_connection_sec,
|
||
"warm_up_time_sec": ch.warm_up_time_sec,
|
||
}
|
||
|
||
|
||
def _serialise_device_info(info: DeviceInfo) -> dict:
|
||
return {
|
||
"serial": info.serial,
|
||
"firmware_version": info.firmware_version,
|
||
"firmware_minor": info.firmware_minor,
|
||
"dsp_version": info.dsp_version,
|
||
"manufacturer": info.manufacturer,
|
||
"model": info.model,
|
||
"event_count_sub08": info.event_count, # unreliable — SUB 08 always returns 1
|
||
"compliance_config": _serialise_compliance_config(info.compliance_config),
|
||
}
|
||
|
||
|
||
def _serialise_event(ev: Event, debug: bool = False) -> dict:
|
||
d: dict = {
|
||
"index": ev.index,
|
||
"timestamp": _serialise_timestamp(ev.timestamp),
|
||
"sample_rate": ev.sample_rate,
|
||
"record_type": ev.record_type,
|
||
"peak_values": _serialise_peak_values(ev.peak_values),
|
||
"project_info": _serialise_project_info(ev.project_info),
|
||
}
|
||
if debug:
|
||
raw = getattr(ev, "_raw_record", None)
|
||
d["raw_record_hex"] = raw.hex() if raw else None
|
||
d["raw_record_len"] = len(raw) if raw else 0
|
||
return d
|
||
|
||
|
||
# ── Transport factory ─────────────────────────────────────────────────────────
|
||
|
||
def _build_client(
|
||
port: Optional[str],
|
||
baud: int,
|
||
host: Optional[str],
|
||
tcp_port: int,
|
||
timeout: float = 30.0,
|
||
) -> MiniMateClient:
|
||
"""
|
||
Return a MiniMateClient configured for either serial or TCP transport.
|
||
|
||
TCP takes priority if *host* is supplied; otherwise *port* (serial) is used.
|
||
Raises HTTPException(422) if neither is provided.
|
||
|
||
Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform
|
||
download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over
|
||
cellular and each individual recv must complete within the timeout window.
|
||
"""
|
||
if host:
|
||
transport = TcpTransport(host, port=tcp_port)
|
||
log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout)
|
||
return MiniMateClient(transport=transport, timeout=timeout)
|
||
elif port:
|
||
log.debug("Serial transport: %s baud=%d", port, baud)
|
||
return MiniMateClient(port, baud)
|
||
else:
|
||
raise HTTPException(
|
||
status_code=422,
|
||
detail=(
|
||
"Specify either 'port' (serial, e.g. ?port=COM5) "
|
||
"or 'host' (TCP, e.g. ?host=192.168.1.50&tcp_port=12345)"
|
||
),
|
||
)
|
||
|
||
|
||
def _is_tcp(host: Optional[str]) -> bool:
|
||
return bool(host)
|
||
|
||
|
||
def _run_with_retry(fn, *, is_tcp: bool):
|
||
"""
|
||
Call fn() and, for TCP connections only, retry once on ProtocolError.
|
||
|
||
Rationale: when a MiniMate Plus is cold (just had its serial lines asserted
|
||
by the modem or a local bridge), it takes 5-10 seconds to boot before it
|
||
will respond to POLL_PROBE. The first request may time out during that boot
|
||
window; a single automatic retry is enough to recover once the unit is up.
|
||
|
||
Serial connections are NOT retried — a timeout there usually means a real
|
||
problem (wrong port, wrong baud, cable unplugged).
|
||
"""
|
||
try:
|
||
return fn()
|
||
except ProtocolError as exc:
|
||
if not is_tcp:
|
||
raise
|
||
log.info("TCP poll timed out (unit may have been cold) — retrying once")
|
||
return fn() # let any second failure propagate normally
|
||
|
||
|
||
# ── Helpers ────────────────────────────────────────────────────────────────────
|
||
|
||
def _backfill_events(events: list, info: "DeviceInfo") -> None:
|
||
"""
|
||
Fill in sample_rate and project_info fields that the per-event waveform
|
||
record doesn't carry — sourced from the device's compliance config.
|
||
|
||
Extracted from device_events() so it can be called from both the full
|
||
download path and the partial (new-events-only) path.
|
||
"""
|
||
if info.compliance_config and info.compliance_config.sample_rate:
|
||
for ev in events:
|
||
if ev.sample_rate is None:
|
||
ev.sample_rate = info.compliance_config.sample_rate
|
||
|
||
if info.compliance_config:
|
||
cc = info.compliance_config
|
||
for ev in events:
|
||
if ev.project_info is None:
|
||
ev.project_info = ProjectInfo()
|
||
pi = ev.project_info
|
||
if pi.client is None: pi.client = cc.client
|
||
if pi.operator is None: pi.operator = cc.operator
|
||
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
|
||
if pi.notes is None: pi.notes = cc.notes
|
||
|
||
|
||
# ── Endpoints ──────────────────────────────────────────────────────────────────
|
||
|
||
@app.get("/health")
|
||
def health() -> dict:
|
||
"""Service heartbeat. No device I/O."""
|
||
return {"status": "ok", "service": "sfm", "version": "0.1.0"}
|
||
|
||
|
||
@app.get("/", response_class=FileResponse)
|
||
def webapp():
|
||
"""Serve the SFM web app."""
|
||
return str(Path(__file__).parent / "sfm_webapp.html")
|
||
|
||
|
||
@app.get("/waveform", response_class=FileResponse)
|
||
def waveform_viewer():
|
||
"""Serve the standalone waveform viewer."""
|
||
return str(Path(__file__).parent / "waveform_viewer.html")
|
||
|
||
|
||
@app.get("/device/info")
|
||
def device_info(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"),
|
||
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
force: bool = Query(False, description="Bypass cache and re-read from device"),
|
||
) -> dict:
|
||
"""
|
||
Connect to the device, perform the POLL startup handshake, and return
|
||
identity information (serial number, firmware version, model).
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
Equivalent to POST /device/connect — provided as GET for convenience.
|
||
|
||
**Caching**: device identity and compliance config are cached after the first
|
||
successful read (they rarely change). Pass *force=true* to bypass the cache
|
||
and re-read directly from the device (e.g. after a config push).
|
||
The cache is also automatically invalidated after POST /device/config.
|
||
"""
|
||
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
|
||
|
||
cache = get_cache()
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
|
||
if not force:
|
||
cached = cache.get_device_info(conn_key)
|
||
if cached is not None:
|
||
log.info("device info cache hit for %s", conn_key)
|
||
cached["_cached"] = True
|
||
return cached
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
info = client.connect()
|
||
# SUB 08 event_count is unreliable (always returns 1 regardless of
|
||
# actual storage). Count via 1E/1F chain instead.
|
||
info.event_count = client.count_events()
|
||
return info
|
||
info = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
result = _serialise_device_info(info)
|
||
cache.set_device_info(conn_key, result)
|
||
return result
|
||
|
||
|
||
@app.post("/device/connect")
|
||
def device_connect(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Connect to the device and return identity. POST variant for terra-view
|
||
compatibility with the SLMM proxy pattern.
|
||
"""
|
||
return device_info(port=port, baud=baud, host=host, tcp_port=tcp_port)
|
||
|
||
|
||
@app.get("/device/events")
|
||
def device_events(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
|
||
force: bool = Query(False, description="Bypass cache and re-download all events from device"),
|
||
) -> dict:
|
||
"""
|
||
Connect to the device, read the event index, and download all stored
|
||
events (event headers + full waveform records with peak values).
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
|
||
**Caching:** a quick count_events() probe (~2s) is performed first. If the
|
||
device's event count matches the cached count, the cached response is returned
|
||
immediately without a full download. Pass ?force=true to skip this and always
|
||
re-download.
|
||
|
||
Pass debug=true to include raw_record_hex in each event — useful for
|
||
verifying field offsets against the protocol reference.
|
||
|
||
This does NOT download raw ADC waveform samples — those are large and
|
||
fetched separately via GET /device/event/{idx}/waveform.
|
||
|
||
**Caching**: event headers are cached after the first download. On subsequent
|
||
calls, the device is contacted only to check the event count (fast: ~2s).
|
||
If the count matches the cache, all events are returned from cache instantly.
|
||
If new events exist on the device, only the new ones are downloaded and merged.
|
||
Pass *force=true* to bypass the cache entirely and re-download everything.
|
||
"""
|
||
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
|
||
|
||
cache = get_cache()
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
|
||
# ── Smart cache path (skip when debug=True or force=True) ────────────────
|
||
# debug mode uses raw_record_hex which isn't stored in the cache, so we
|
||
# must always go to the device when debug is requested.
|
||
if not force and not debug:
|
||
cached_events = cache.get_all_events(conn_key)
|
||
cached_count = len(cached_events) if cached_events else 0
|
||
|
||
if cached_count > 0:
|
||
# Quick device contact: just count events via the fast 1E/1F chain.
|
||
# This takes ~2s instead of the full event download (~10-30s).
|
||
try:
|
||
def _count():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
client.connect()
|
||
return client.count_events()
|
||
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except (ProtocolError, OSError, Exception) as exc:
|
||
# If we can't reach the device at all, serve stale cache rather
|
||
# than returning an error — field units go offline regularly.
|
||
log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key)
|
||
cached_info = cache.get_device_info(conn_key) or {}
|
||
return {
|
||
"device": cached_info,
|
||
"event_count": cached_count,
|
||
"events": cached_events,
|
||
"_cached": True,
|
||
"_stale": True,
|
||
}
|
||
|
||
if device_count == cached_count:
|
||
# Nothing new — return cache immediately, no event download needed.
|
||
log.info(
|
||
"event cache hit for %s: %d events, device count matches",
|
||
conn_key, cached_count,
|
||
)
|
||
cached_info = cache.get_device_info(conn_key) or {}
|
||
return {
|
||
"device": cached_info,
|
||
"event_count": cached_count,
|
||
"events": cached_events,
|
||
"_cached": True,
|
||
}
|
||
|
||
if device_count > cached_count:
|
||
# New events on the device — download all events but only store/return
|
||
# the new ones. Events are append-only; indices 0..(cached_count-1)
|
||
# are already in the cache and don't need to be re-downloaded logically,
|
||
# but the protocol requires iterating from event 0 to reach later ones.
|
||
# The device download time is dominated by the number of events requested,
|
||
# so we stop at the last known event index to avoid re-downloading everything.
|
||
log.info(
|
||
"new events on device %s: have %d, device has %d — fetching all up to %d",
|
||
conn_key, cached_count, device_count, device_count - 1,
|
||
)
|
||
try:
|
||
def _fetch_new():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
info = client.connect()
|
||
all_evs = client.get_events(stop_after_index=device_count - 1)
|
||
return info, all_evs
|
||
info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
_backfill_events(all_events, info)
|
||
# Only the new events (indices >= cached_count) are truly new.
|
||
new_events = [ev for ev in all_events if ev.index >= cached_count]
|
||
new_serialised = [_serialise_event(ev) for ev in new_events]
|
||
cache.set_events(conn_key, new_serialised)
|
||
cache.set_device_info(conn_key, _serialise_device_info(info))
|
||
|
||
merged_events = cache.get_all_events(conn_key)
|
||
return {
|
||
"device": _serialise_device_info(info),
|
||
"event_count": len(merged_events),
|
||
"events": merged_events,
|
||
"_cached": True,
|
||
"_new_events": len(new_events),
|
||
}
|
||
|
||
# ── Full download path (first call, force=True, or debug=True) ───────────
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
return client.connect(), client.get_events(debug=debug)
|
||
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
_backfill_events(events, info)
|
||
serialised = [_serialise_event(ev, debug=debug) for ev in events]
|
||
|
||
if not debug:
|
||
# Only cache when not in debug mode (debug adds raw_record_hex which
|
||
# we don't want polluting the normal cache entries).
|
||
cache.set_events(conn_key, serialised)
|
||
cache.set_device_info(conn_key, _serialise_device_info(info))
|
||
|
||
serialised_info = _serialise_device_info(info)
|
||
serialised_events = [_serialise_event(ev, debug=debug) for ev in events]
|
||
|
||
# Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache)
|
||
if not debug:
|
||
_live_cache.set_device_info(conn_key, serialised_info)
|
||
_live_cache.set_events(conn_key, len(events), serialised_events)
|
||
|
||
return {
|
||
"device": serialised_info,
|
||
"event_count": len(events),
|
||
"events": serialised,
|
||
}
|
||
|
||
|
||
@app.get("/device/event/{index}")
|
||
def device_event(
|
||
index: int,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
force: bool = Query(False, description="Bypass cache and re-download from device"),
|
||
) -> dict:
|
||
"""
|
||
Download a single event by index (0-based).
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
Performs: POLL startup → event index → event header → waveform record.
|
||
|
||
**Caching**: if this event was already downloaded (e.g. via GET /device/events),
|
||
it is returned instantly from cache with no device contact.
|
||
"""
|
||
log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force)
|
||
|
||
cache = get_cache()
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
|
||
if not force:
|
||
cached = cache.get_event(conn_key, index)
|
||
if cached is not None:
|
||
log.info("event cache hit for %s index %d", conn_key, index)
|
||
cached["_cached"] = True
|
||
return cached
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
info = client.connect()
|
||
events = client.get_events(stop_after_index=index)
|
||
return info, events
|
||
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
matching = [ev for ev in events if ev.index == index]
|
||
if not matching:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event index {index} not found on device",
|
||
)
|
||
|
||
_backfill_events(matching, info)
|
||
result = _serialise_event(matching[0])
|
||
|
||
# Store all downloaded events (we paid for them anyway — indices 0..index)
|
||
all_serialised = [_serialise_event(ev) for ev in events]
|
||
cache.set_events(conn_key, all_serialised)
|
||
|
||
return result
|
||
|
||
|
||
@app.get("/device/event/{index}/waveform")
|
||
def device_event_waveform(
|
||
index: int,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
force: bool = Query(False, description="Bypass cache and re-download from device"),
|
||
) -> dict:
|
||
"""
|
||
Download the full raw ADC waveform for a single event (0-based index).
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
|
||
Performs: POLL startup → get_events() (to locate the 4-byte waveform key) →
|
||
download_waveform() (full SUB 5A stream, stop_after_metadata=False).
|
||
|
||
Response includes:
|
||
- **total_samples**: expected sample-sets from the STRT record
|
||
- **pretrig_samples**: pre-trigger sample count
|
||
- **rectime_seconds**: record duration
|
||
- **samples_decoded**: actual sample-sets decoded (may be less than total_samples
|
||
if the device is not storing all frames yet, or the capture was partial)
|
||
- **sample_rate**: samples per second (from compliance config)
|
||
- **channels**: dict of channel name → list of signed int16 ADC counts
|
||
(keys: "Tran", "Vert", "Long", "MicL")
|
||
|
||
**Caching**: full waveforms are cached permanently after the first download —
|
||
they are immutable once recorded on the device. Subsequent requests for the
|
||
same event return instantly from cache without any device contact.
|
||
Pass *force=true* to force a fresh download (rarely needed).
|
||
"""
|
||
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
|
||
|
||
cache = get_cache()
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
|
||
if not force:
|
||
cached = cache.get_waveform(conn_key, index)
|
||
if cached is not None:
|
||
log.info("waveform cache hit for %s event %d", conn_key, index)
|
||
cached["_cached"] = True
|
||
return cached
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
|
||
info = client.connect()
|
||
# stop_after_index avoids downloading events beyond the one requested.
|
||
events = client.get_events(full_waveform=True, stop_after_index=index)
|
||
matching = [ev for ev in events if ev.index == index]
|
||
return matching[0] if matching else None, info
|
||
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
if ev is None:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event index {index} not found on device",
|
||
)
|
||
|
||
# Backfill from compliance_config: sample_rate, record_time, and
|
||
# derived total_samples. These are user-set authoritative values; the
|
||
# corresponding STRT-derived guesses in `_decode_a5_waveform` can be
|
||
# off (e.g. rectime used to read the 0x46 record-type marker = 70s).
|
||
cc = info.compliance_config
|
||
if cc:
|
||
if ev.sample_rate is None and cc.sample_rate:
|
||
ev.sample_rate = cc.sample_rate
|
||
if cc.record_time:
|
||
ev.rectime_seconds = cc.record_time
|
||
if ev.sample_rate and ev.rectime_seconds:
|
||
derived = int(round(ev.sample_rate * ev.rectime_seconds))
|
||
if (ev.total_samples is None
|
||
or ev.total_samples > derived * 2
|
||
or ev.total_samples < derived // 4):
|
||
ev.total_samples = derived
|
||
geo_range = getattr(cc, "geo_range", None) if cc else None
|
||
|
||
# Build the plot.v1 JSON: samples in physical units (in/s for geo, psi
|
||
# for mic), explicit time axis, peak markers — the shape clients should
|
||
# consume directly without doing any ADC scaling.
|
||
serial = getattr(info, "serial", None) or ""
|
||
result = event_hdf5.event_to_plot_json(
|
||
ev, serial=serial,
|
||
geo_range=geo_range or "normal",
|
||
index=index,
|
||
)
|
||
cache.set_waveform(conn_key, index, result)
|
||
return result
|
||
|
||
|
||
@app.get("/device/event/{index}/blastware_file")
|
||
def device_event_blastware_file(
|
||
index: int,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
force: bool = Query(False, description="Bypass any cached/dedup'd state and re-download from device"),
|
||
) -> FileResponse:
|
||
"""
|
||
Download the waveform for a single event (0-based index) and return it
|
||
as a Blastware-compatible binary file with a correct Blastware filename.
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
|
||
The file is written to the OS temp directory and streamed back as a binary
|
||
download. Blastware can open it directly — filename encodes serial + timestamp.
|
||
|
||
Filename format: <prefix><serial3><stem><AB>0<W|H>
|
||
- prefix letter = chr(ord('B') + floor(serial_numeric / 1000))
|
||
- stem + AB = second-resolution timestamp since 1985-01-01 local
|
||
- W / H = Full Waveform / Full Histogram (defaults to W for
|
||
triggered events; histogram requires recording_mode
|
||
to be populated from compliance config)
|
||
|
||
Performs: POLL startup → get_events(full_waveform=True,
|
||
stop_after_index=index) → write_blastware_file() → FileResponse +
|
||
persistent store + DB upsert.
|
||
"""
|
||
log.info(
|
||
"GET /device/event/%d/blastware_file port=%s host=%s force=%s",
|
||
index, port, host, force,
|
||
)
|
||
# `force` always re-downloads from the device. This endpoint already
|
||
# never short-circuits via cache, so `force` is reserved for parity with
|
||
# the other live endpoints.
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port, timeout=120.0) as client:
|
||
info = client.connect()
|
||
# full_waveform=True pulls the complete 5A stream so the
|
||
# client populates STRT-derived fields (total_samples,
|
||
# pretrig_samples, rectime_seconds) AND raw_samples on the
|
||
# Event. Required for the .h5 + .sfm.json sidecar to be
|
||
# filled in correctly — without it, those land as nulls.
|
||
events = client.get_events(
|
||
full_waveform=True,
|
||
stop_after_index=index,
|
||
)
|
||
matching = [ev for ev in events if ev.index == index]
|
||
return matching[0] if matching else None, info
|
||
ev, info = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
log.error("blastware_file: protocol error: %s", exc, exc_info=True)
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
log.error("blastware_file: connection error: %s", exc, exc_info=True)
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
log.error("blastware_file: unexpected error: %s", exc, exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
if ev is None:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event index {index} not found on device",
|
||
)
|
||
|
||
a5_frames = getattr(ev, "_a5_frames", None)
|
||
if not a5_frames:
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"No waveform data received for event index {index} — 5A download failed",
|
||
)
|
||
|
||
# Determine serial number from device info
|
||
serial = getattr(info, "serial", None) or "UNKNOWN"
|
||
|
||
# Build filename using the same algorithm Blastware uses
|
||
filename = blastware_filename(ev, serial)
|
||
|
||
# Write to OS temp dir (cross-platform: /tmp on Linux/macOS,
|
||
# %TEMP% on Windows) so FastAPI can stream it back via FileResponse.
|
||
out_path = Path(tempfile.gettempdir()) / filename
|
||
# Delete any stale file at this path before writing. On Windows we have
|
||
# observed the new (smaller) file getting trailing zero-bytes from the
|
||
# previous (larger) file when filesystem semantics around open(...,"wb")
|
||
# don't truncate cleanly (e.g. through a synced folder). Explicit unlink
|
||
# eliminates that ambiguity.
|
||
try:
|
||
out_path.unlink()
|
||
except FileNotFoundError:
|
||
pass
|
||
write_blastware_file(ev, a5_frames, out_path)
|
||
log.info(
|
||
"blastware_file: wrote %s (%d A5 frames, serial=%s)",
|
||
out_path, len(a5_frames), serial,
|
||
)
|
||
|
||
# Promote to canonical persistent store + DB row so this event is
|
||
# queryable via /db/events afterwards (matches the ACH ingestion path).
|
||
if serial != "UNKNOWN" and ev._waveform_key is not None:
|
||
try:
|
||
cc = info.compliance_config
|
||
# Backfill authoritative compliance-config values onto the
|
||
# Event before persisting. These supersede whatever
|
||
# _decode_a5_waveform read from the STRT bytes (some of which
|
||
# have ambiguous semantics — e.g. STRT[20] is rectime but
|
||
# STRT[8:10] / STRT[16:18] are device-specific scratch fields
|
||
# that aren't reliable sample/pretrig counts).
|
||
if cc:
|
||
if ev.sample_rate is None and cc.sample_rate:
|
||
ev.sample_rate = cc.sample_rate
|
||
if cc.record_time:
|
||
# record_time from compliance is authoritative — the
|
||
# user-set value the device followed when recording.
|
||
ev.rectime_seconds = cc.record_time
|
||
# Derive total_samples from sample_rate × rectime when
|
||
# we can; the STRT-derived value can land at a buffer-
|
||
# offset rather than a sample count.
|
||
if ev.sample_rate and ev.rectime_seconds:
|
||
derived = int(round(ev.sample_rate * ev.rectime_seconds))
|
||
if (ev.total_samples is None
|
||
or ev.total_samples > derived * 2
|
||
or ev.total_samples < derived // 4):
|
||
ev.total_samples = derived
|
||
geo_range = getattr(cc, "geo_range", None) if cc else None
|
||
rec = _get_store().save(
|
||
ev, serial=serial, a5_frames=a5_frames,
|
||
geo_range=geo_range if geo_range is not None else "normal",
|
||
)
|
||
_get_db().insert_events(
|
||
[ev],
|
||
serial=serial,
|
||
waveform_records={ev._waveform_key.hex(): rec},
|
||
)
|
||
log.info(
|
||
"blastware_file: persisted to store (%s, %d bytes)",
|
||
rec["filename"], rec["filesize"],
|
||
)
|
||
except Exception as exc:
|
||
log.warning(
|
||
"blastware_file: persistent store save failed: %s "
|
||
"— temp file still served",
|
||
exc,
|
||
)
|
||
|
||
return FileResponse(
|
||
path=str(out_path),
|
||
filename=filename,
|
||
media_type="application/octet-stream",
|
||
)
|
||
|
||
|
||
# ── Write endpoints ───────────────────────────────────────────────────────────
|
||
|
||
class DeviceConfigBody(BaseModel):
|
||
"""
|
||
Request body for POST /device/config.
|
||
|
||
All fields are optional — only supplied (non-null) fields are written to
|
||
the device. All other config bytes are round-tripped verbatim.
|
||
|
||
Recording parameters
|
||
--------------------
|
||
recording_mode : Recording mode enum. Values: 0=Single Shot, 1=Continuous, 3=Histogram, 4=Histogram+Continuous.
|
||
sample_rate : Samples per second. Valid values: 1024, 2048, 4096.
|
||
record_time : Record duration in seconds (e.g. 1.0, 2.0, 3.0).
|
||
|
||
Trigger / alarm thresholds and range (geo channels)
|
||
----------------------------------------------------
|
||
trigger_level_geo : Trigger threshold in in/s (e.g. 0.5).
|
||
alarm_level_geo : Alarm threshold in in/s (e.g. 1.0).
|
||
geo_range : Geophone range/sensitivity. 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s.
|
||
Project / operator strings (max 41 ASCII characters each)
|
||
----------------------------
|
||
project : Project description.
|
||
client_name : Client / company name.
|
||
operator : Operator / technician name.
|
||
seis_loc : Sensor location description.
|
||
notes : Extended notes.
|
||
"""
|
||
# Recording parameters
|
||
recording_mode: Optional[int] = None
|
||
sample_rate: Optional[int] = None
|
||
record_time: Optional[float] = None
|
||
histogram_interval_sec: Optional[int] = None # seconds: 2, 5, 15, 60, 300, 900 (mode-gated)
|
||
# Threshold parameters / geo range
|
||
trigger_level_geo: Optional[float] = None
|
||
alarm_level_geo: Optional[float] = None
|
||
geo_range: Optional[int] = None # 0=Normal 10.000 in/s, 1=Sensitive 1.250 in/s
|
||
# Project / operator strings
|
||
project: Optional[str] = None
|
||
client_name: Optional[str] = None
|
||
operator: Optional[str] = None
|
||
seis_loc: Optional[str] = None
|
||
notes: Optional[str] = None
|
||
|
||
|
||
@app.post("/device/config")
|
||
def device_config(
|
||
body: DeviceConfigBody,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Read the current device config, apply any supplied changes to the compliance
|
||
block, and write the full config back.
|
||
|
||
Only non-null fields in the JSON body are modified. All other config bytes
|
||
are round-tripped verbatim from the device.
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem).
|
||
|
||
Example body (all fields optional — include only what you want to change):
|
||
{
|
||
"recording_mode": 1,
|
||
"sample_rate": 1024,
|
||
"record_time": 3.0,
|
||
"trigger_level_geo": 0.5,
|
||
"alarm_level_geo": 1.0,
|
||
"project": "Bridge Inspection 2026",
|
||
"client_name": "City of Portland",
|
||
"operator": "Brian Harrison",
|
||
"seis_loc": "South Abutment",
|
||
"notes": "Pre-blast baseline"
|
||
}
|
||
|
||
Returns:
|
||
{"status": "ok", "updated_fields": {...}} on success.
|
||
|
||
Raises:
|
||
502 on protocol errors (timeout, bad ack, etc.).
|
||
422 if neither port nor host is provided.
|
||
"""
|
||
changed = body.model_dump(exclude_none=True)
|
||
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
|
||
log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys()))
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
client.connect()
|
||
client.apply_config(
|
||
recording_mode=body.recording_mode,
|
||
sample_rate=body.sample_rate,
|
||
record_time=body.record_time,
|
||
histogram_interval_sec=body.histogram_interval_sec,
|
||
trigger_level_geo=body.trigger_level_geo,
|
||
alarm_level_geo=body.alarm_level_geo,
|
||
geo_range=body.geo_range,
|
||
project=body.project,
|
||
client_name=body.client_name,
|
||
operator=body.operator,
|
||
seis_loc=body.seis_loc,
|
||
notes=body.notes,
|
||
)
|
||
_run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
# Config was written to the device — the cached compliance config is now stale.
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
get_cache().mark_config_dirty(conn_key)
|
||
|
||
return {
|
||
"status": "ok",
|
||
"updated_fields": changed,
|
||
}
|
||
|
||
|
||
# Keep the old endpoint alive under its old URL for anything already calling it
|
||
@app.post("/device/config/project")
|
||
def device_config_project(
|
||
body: DeviceConfigBody,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""Deprecated alias for POST /device/config — use that instead."""
|
||
return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port)
|
||
|
||
|
||
# ── Monitoring endpoints ───────────────────────────────────────────────────────
|
||
|
||
@app.get("/device/monitor/status")
|
||
def device_monitor_status(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
force: bool = Query(False, description="Bypass cache and re-read from device"),
|
||
) -> dict:
|
||
"""
|
||
Read monitoring status from the device.
|
||
|
||
Uses poll() (POLL handshake only — no config/compliance reads) so the
|
||
request completes in ~2 seconds instead of ~15. The full connect() was
|
||
causing false "idle" readings because the compliance+event sequence was
|
||
interacting with the device state before the 0x1C read.
|
||
|
||
Returns is_monitoring bool, battery voltage, and memory usage (total + free
|
||
bytes). Battery and memory are only present when the unit is idle.
|
||
|
||
**Caching**: status is cached for 30 seconds to reduce cellular polling overhead.
|
||
Pass *force=true* to bypass the cache for an immediate fresh read.
|
||
"""
|
||
cache = get_cache()
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
|
||
if not force:
|
||
cached = cache.get_monitor_status(conn_key)
|
||
if cached is not None:
|
||
log.debug("monitor status cache hit for %s", conn_key)
|
||
cached["_cached"] = True
|
||
return cached
|
||
|
||
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
|
||
try:
|
||
client.poll()
|
||
except Exception as exc:
|
||
log.warning("monitor status poll retry: %s", exc)
|
||
client.poll()
|
||
status = client.get_monitor_status()
|
||
|
||
result: dict = {"is_monitoring": status.is_monitoring}
|
||
if status.battery_v is not None:
|
||
result["battery_v"] = round(status.battery_v, 2)
|
||
if status.memory_total is not None:
|
||
result["memory_total_bytes"] = status.memory_total
|
||
result["memory_total_kb"] = round(status.memory_total / 1024, 1)
|
||
if status.memory_free is not None:
|
||
result["memory_free_bytes"] = status.memory_free
|
||
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
|
||
|
||
cache.set_monitor_status(conn_key, result)
|
||
return result
|
||
|
||
|
||
@app.post("/device/monitor/start")
|
||
def device_monitor_start(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Command the device to start monitoring (recording triggered events).
|
||
|
||
Sends SUB 0x96 and waits for ack SUB 0x69.
|
||
"""
|
||
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
|
||
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
|
||
try:
|
||
client.poll()
|
||
except Exception as exc:
|
||
log.warning("start monitoring poll retry: %s", exc)
|
||
client.poll()
|
||
client.start_monitoring()
|
||
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
get_cache().invalidate_monitor_status(conn_key)
|
||
return {"status": "started"}
|
||
|
||
|
||
@app.post("/device/monitor/stop")
|
||
def device_monitor_stop(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Command the device to stop monitoring.
|
||
|
||
Sends SUB 0x97 and waits for ack SUB 0x68.
|
||
"""
|
||
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
|
||
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
|
||
try:
|
||
client.poll()
|
||
except Exception as exc:
|
||
log.warning("stop monitoring poll retry: %s", exc)
|
||
client.poll()
|
||
client.stop_monitoring()
|
||
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
get_cache().invalidate_monitor_status(conn_key)
|
||
return {"status": "stopped"}
|
||
|
||
|
||
# ── Call home config endpoints ───────────────────────────────────────────────
|
||
|
||
|
||
@app.get("/device/call_home")
|
||
def device_call_home_get(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Read the Auto Call Home (ACH) configuration from the device.
|
||
|
||
Sends SUB 0x2C (two-step read) and returns the decoded call home config.
|
||
|
||
Confirmed from 4-20-26 call home settings captures (BE11529).
|
||
|
||
Returns:
|
||
{
|
||
"auto_call_home_enabled": true/false,
|
||
"dial_string": "RADIO RING",
|
||
"after_event_recorded": true/false,
|
||
"at_specified_times": true/false,
|
||
"time1_enabled": true/false, "time1_hour": 19, "time1_min": 55,
|
||
"time2_enabled": false, "time2_hour": 0, "time2_min": 0,
|
||
"num_retries": 3,
|
||
"time_between_retries_sec": 15,
|
||
"wait_for_connection_sec": 60,
|
||
"warm_up_time_sec": 60
|
||
}
|
||
"""
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
client.poll()
|
||
return client.get_call_home_config()
|
||
ch_config = _run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
return _serialise_call_home_config(ch_config) or {}
|
||
|
||
|
||
class CallHomeConfigBody(BaseModel):
|
||
"""
|
||
Request body for POST /device/call_home.
|
||
|
||
All fields are optional — only supplied (non-null) fields are modified.
|
||
All other call home config bytes are round-tripped verbatim from the device.
|
||
|
||
Confirmed writable fields (4-20-26 captures):
|
||
auto_call_home_enabled : bool — master enable for auto call home
|
||
after_event_recorded : bool — call home after each triggered event
|
||
at_specified_times : bool — enable time-based scheduled calls
|
||
time1_enabled : bool — enable time slot 1
|
||
time1_hour : int — hour for slot 1 (0-23; avoid 3 — DLE escape limitation)
|
||
time1_min : int — minute for slot 1 (0-59; avoid 3)
|
||
time2_enabled : bool — enable time slot 2
|
||
time2_hour : int — hour for slot 2 (0-23; avoid 3)
|
||
time2_min : int — minute for slot 2 (0-59; avoid 3)
|
||
|
||
Read-only fields (not writable via this endpoint):
|
||
dial_string, num_retries, time_between_retries_sec,
|
||
wait_for_connection_sec, warm_up_time_sec
|
||
"""
|
||
auto_call_home_enabled: Optional[bool] = None
|
||
after_event_recorded: Optional[bool] = None
|
||
at_specified_times: Optional[bool] = None
|
||
time1_enabled: Optional[bool] = None
|
||
time1_hour: Optional[int] = None
|
||
time1_min: Optional[int] = None
|
||
time2_enabled: Optional[bool] = None
|
||
time2_hour: Optional[int] = None
|
||
time2_min: Optional[int] = None
|
||
|
||
|
||
@app.post("/device/call_home")
|
||
def device_call_home_set(
|
||
body: CallHomeConfigBody,
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Read the current call home config, apply supplied changes, and write back.
|
||
|
||
Only non-null fields are modified. All other bytes round-trip verbatim.
|
||
|
||
Write sequence (confirmed from 4-20-26 call home settings captures):
|
||
SUB 0x2C (read 2-step) → 125-byte raw payload
|
||
patch fields
|
||
SUB 0x7E (write 127-byte payload) → ack 0x81
|
||
SUB 0x7F (confirm) → ack 0x80
|
||
|
||
Example body:
|
||
{ "auto_call_home_enabled": true, "after_event_recorded": true,
|
||
"time1_enabled": true, "time1_hour": 20, "time1_min": 0 }
|
||
"""
|
||
changed = body.model_dump(exclude_none=True)
|
||
log.info("POST /device/call_home port=%s host=%s fields=%s", port, host, list(changed.keys()))
|
||
|
||
try:
|
||
def _do():
|
||
with _build_client(port, baud, host, tcp_port) as client:
|
||
client.poll()
|
||
client.set_call_home_config(
|
||
auto_call_home_enabled=body.auto_call_home_enabled,
|
||
after_event_recorded=body.after_event_recorded,
|
||
at_specified_times=body.at_specified_times,
|
||
time1_enabled=body.time1_enabled,
|
||
time1_hour=body.time1_hour,
|
||
time1_min=body.time1_min,
|
||
time2_enabled=body.time2_enabled,
|
||
time2_hour=body.time2_hour,
|
||
time2_min=body.time2_min,
|
||
)
|
||
_run_with_retry(_do, is_tcp=_is_tcp(host))
|
||
except HTTPException:
|
||
raise
|
||
except ProtocolError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
|
||
|
||
return {"status": "ok", "updated_fields": changed}
|
||
|
||
|
||
# ── Cache management endpoints ────────────────────────────────────────────────
|
||
|
||
@app.get("/cache/stats")
|
||
def cache_stats() -> dict:
|
||
"""
|
||
Return row counts for all cache tables.
|
||
|
||
Useful for debugging and verifying that caching is working as expected.
|
||
"""
|
||
return get_cache().stats()
|
||
|
||
|
||
@app.delete("/cache/device")
|
||
def cache_clear_device(
|
||
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
|
||
baud: int = Query(38400, description="Serial baud rate"),
|
||
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
|
||
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
|
||
) -> dict:
|
||
"""
|
||
Clear all cached data for a specific device (identified by its connection address).
|
||
|
||
Clears: device info, all event headers, all waveforms, monitor status.
|
||
The next request to any endpoint for this device will re-fetch from the device.
|
||
|
||
Supply either *port* (serial) or *host* (TCP/modem) to identify the device.
|
||
"""
|
||
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
|
||
counts = get_cache().clear_device(conn_key)
|
||
return {"status": "cleared", "conn_key": conn_key, "deleted": counts}
|
||
|
||
|
||
# ── DB read endpoints ─────────────────────────────────────────────────────────
|
||
#
|
||
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.
|
||
# All queries are read-only. Terra-view calls these to build project event
|
||
# views, unit history panels, and (eventually) vibration summary reports.
|
||
|
||
|
||
@app.get("/db/units")
|
||
def db_units() -> list[dict]:
|
||
"""
|
||
Return one row per known serial with summary stats:
|
||
last_seen, total_events, total_monitor_entries, total_sessions.
|
||
"""
|
||
return _get_db().query_units()
|
||
|
||
|
||
@app.get("/db/events")
|
||
def db_events(
|
||
serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"),
|
||
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
|
||
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
|
||
false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"),
|
||
limit: int = Query(500, description="Max rows to return (default 500)"),
|
||
offset: int = Query(0, description="Pagination offset"),
|
||
) -> dict:
|
||
"""
|
||
Query triggered events from the DB.
|
||
|
||
Returns events newest-first. All filter params are optional.
|
||
|
||
Example:
|
||
GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100
|
||
"""
|
||
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
|
||
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
|
||
|
||
rows = _get_db().query_events(
|
||
serial=serial,
|
||
from_dt=from_parsed,
|
||
to_dt=to_parsed,
|
||
false_trigger=false_trigger,
|
||
limit=limit,
|
||
offset=offset,
|
||
)
|
||
return {"count": len(rows), "events": rows}
|
||
|
||
|
||
@app.patch("/db/events/{event_id}/false_trigger")
|
||
def db_set_false_trigger(
|
||
event_id: str,
|
||
value: bool = Query(..., description="True to flag as false trigger, False to clear"),
|
||
) -> dict:
|
||
"""
|
||
Set or clear the false_trigger flag on a single event.
|
||
|
||
Used by the terra-view event review UI.
|
||
Returns 404 if the event_id is not found.
|
||
"""
|
||
found = _get_db().set_false_trigger(event_id, value)
|
||
if not found:
|
||
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
|
||
return {"status": "ok", "event_id": event_id, "false_trigger": value}
|
||
|
||
|
||
# ── /db/events/{id} — waveform file accessors ─────────────────────────────────
|
||
#
|
||
# These endpoints serve files from the persistent WaveformStore, so a Blastware
|
||
# file or its decoded JSON for a previously-ingested ACH event can be fetched
|
||
# without re-dialing the device.
|
||
|
||
@app.get("/db/events/{event_id}/blastware_file")
|
||
def db_event_blastware_file(event_id: str) -> FileResponse:
|
||
"""
|
||
Return the Blastware-format event file for a previously-ingested
|
||
event. Filename extension is per-event (timestamp-encoded
|
||
`AB0T` for ACH downloads, 3-char `AB0` for direct downloads).
|
||
404 if the event is unknown or has no event file in the store
|
||
(events ingested before the store was wired will show this —
|
||
re-download via the live endpoint to populate).
|
||
"""
|
||
row = _get_db().get_event(event_id)
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
|
||
serial = row.get("serial")
|
||
filename = row.get("blastware_filename")
|
||
if not serial or not filename:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=(
|
||
f"Event {event_id} has no Blastware file in the store. "
|
||
"Re-download via the live endpoint to populate."
|
||
),
|
||
)
|
||
bw_path = _get_store().open_blastware(serial, filename)
|
||
if bw_path is None:
|
||
raise HTTPException(
|
||
status_code=410,
|
||
detail=f"Stored file missing on disk: {filename}",
|
||
)
|
||
return FileResponse(
|
||
path=str(bw_path),
|
||
filename=filename,
|
||
media_type="application/octet-stream",
|
||
)
|
||
|
||
|
||
@app.get("/db/events/{event_id}/waveform.json")
|
||
def db_event_waveform_json(event_id: str) -> dict:
|
||
"""
|
||
Return the plot-ready JSON (`sfm.plot.v1`) for a stored event.
|
||
|
||
Resolution order (cheapest first):
|
||
1. If `<filename>.h5` exists, serve it via `plot_json_from_hdf5`.
|
||
Samples are already in physical units; no decode work needed.
|
||
2. Else if `<filename>.a5.pkl` exists, replay the A5 decoders to
|
||
rebuild an Event and serialise via `event_to_plot_json`.
|
||
3. Else 404 — the event has no waveform data on disk.
|
||
|
||
The shape is identical regardless of source, so clients (the SFM
|
||
webapp, Terra-View, etc.) consume the same `sfm.plot.v1` payload.
|
||
"""
|
||
row = _get_db().get_event(event_id)
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
|
||
serial = row.get("serial")
|
||
filename = row.get("blastware_filename")
|
||
if not serial or not filename:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event {event_id} has no event file in the store",
|
||
)
|
||
store = _get_store()
|
||
|
||
# Path 1: HDF5 (canonical clean format).
|
||
h5_path = store.hdf5_path_for(serial, filename)
|
||
if h5_path.exists():
|
||
try:
|
||
return event_hdf5.plot_json_from_hdf5(h5_path, event_id=event_id)
|
||
except Exception as exc:
|
||
log.warning("HDF5 read failed (%s); falling back to A5 path", exc)
|
||
|
||
# Path 2: A5 pickle replay.
|
||
a5_frames = store.load_a5(serial, filename)
|
||
if not a5_frames:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=(
|
||
f"Event {event_id} has no waveform data on disk "
|
||
"(no .h5 and no .a5.pkl). Run the backfill script or "
|
||
"re-download via the live endpoint to populate."
|
||
),
|
||
)
|
||
|
||
ev = Event(index=-1)
|
||
try:
|
||
_decode_a5_metadata_into(a5_frames, ev)
|
||
except Exception as exc:
|
||
log.warning("db_event_waveform_json: metadata decode failed: %s", exc)
|
||
try:
|
||
_decode_a5_waveform(a5_frames, ev)
|
||
except Exception as exc:
|
||
log.error("db_event_waveform_json: waveform decode failed: %s", exc, exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"Waveform decode failed: {exc}") from exc
|
||
|
||
# Carry over fields from the DB row when the A5 replay didn't fill them.
|
||
if ev.sample_rate is None and row.get("sample_rate"):
|
||
ev.sample_rate = row.get("sample_rate")
|
||
|
||
return event_hdf5.event_to_plot_json(
|
||
ev, serial=serial, geo_range="normal", event_id=event_id,
|
||
)
|
||
|
||
|
||
# ── /db/events/{id}/sidecar — modern .sfm.json review/metadata accessors ──────
|
||
|
||
|
||
class SidecarPatchBody(BaseModel):
|
||
"""Body for PATCH /db/events/{id}/sidecar.
|
||
|
||
JSON-merge-patch semantics: only the keys you include get updated.
|
||
`review` is the editable block for monthly-summary workflows
|
||
(false_trigger flag, reviewer notes, etc.); `extensions` is the
|
||
forward-compat namespace for vendor / future fields.
|
||
"""
|
||
review: Optional[dict] = None
|
||
extensions: Optional[dict] = None
|
||
|
||
|
||
@app.get("/db/events/{event_id}/sidecar")
|
||
def db_event_sidecar(event_id: str) -> dict:
|
||
"""
|
||
Return the .sfm.json sidecar for a stored event. 404 if the event
|
||
is unknown or has no sidecar in the store (events ingested before
|
||
the sidecar feature landed will show this until backfilled).
|
||
"""
|
||
row = _get_db().get_event(event_id)
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
|
||
serial = row.get("serial")
|
||
filename = row.get("blastware_filename")
|
||
if not serial or not filename:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event {event_id} has no event file in the store",
|
||
)
|
||
sidecar = _get_store().load_sidecar(serial, filename)
|
||
if sidecar is None:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=(
|
||
f"No .sfm.json sidecar on disk for {filename}. "
|
||
"Run scripts/backfill_sidecars.py to generate one."
|
||
),
|
||
)
|
||
return sidecar
|
||
|
||
|
||
@app.patch("/db/events/{event_id}/sidecar")
|
||
def db_event_sidecar_patch(event_id: str, body: SidecarPatchBody) -> dict:
|
||
"""
|
||
JSON-merge-patch the sidecar's `review` and/or `extensions` blocks.
|
||
|
||
The sidecar JSON is the source of truth for review state. When
|
||
`review.false_trigger` is updated, the SQL `events.false_trigger`
|
||
column is kept in sync as a derived index for fast filtering.
|
||
|
||
Returns the new full sidecar. 404 if the event or sidecar is missing.
|
||
"""
|
||
row = _get_db().get_event(event_id)
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
|
||
serial = row.get("serial")
|
||
filename = row.get("blastware_filename")
|
||
if not serial or not filename:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Event {event_id} has no event file in the store",
|
||
)
|
||
|
||
if not (body.review or body.extensions):
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="PATCH body must include `review` and/or `extensions`",
|
||
)
|
||
|
||
new_sidecar = _get_store().patch_sidecar(
|
||
serial, filename,
|
||
review=body.review,
|
||
extensions=body.extensions,
|
||
)
|
||
if new_sidecar is None:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"No .sfm.json sidecar on disk for {filename}",
|
||
)
|
||
|
||
# Mirror false_trigger from review block into the SQL index column.
|
||
if body.review is not None:
|
||
_get_db().update_event_review(event_id, new_sidecar.get("review", {}))
|
||
|
||
return new_sidecar
|
||
|
||
|
||
# ── /db/import/blastware_file — ingest BW-only event files ────────────────────
|
||
|
||
|
||
@app.post("/db/import/blastware_file")
|
||
async def db_import_blastware_file(
|
||
files: list[UploadFile] = File(...),
|
||
serial: Optional[str] = Query(None, description="Optional serial-number hint (e.g. BE11529); falls back to the BW filename's encoded prefix when omitted"),
|
||
) -> dict:
|
||
"""
|
||
Multipart upload of one or more Blastware event file binaries
|
||
(typically produced by Blastware's own ACH). For each file:
|
||
|
||
1. Parse the bytes via WaveformStore.save_imported_bw — produces
|
||
a parsed Event + copies the file into the persistent store +
|
||
writes a .sfm.json sidecar with source.kind = "bw-import".
|
||
2. Upsert a row into `events` (dedup'd on serial+timestamp).
|
||
|
||
**Paired BW ASCII reports.** When Blastware's ACH writes events,
|
||
it also emits a per-event report alongside each binary as
|
||
``<binary>.TXT`` (e.g. ``M529LK44.AB0`` + ``M529LK44.AB0.TXT``).
|
||
If a request includes ``.TXT`` files matching a binary's filename,
|
||
the report is parsed and its decoded fields land in the sidecar's
|
||
``bw_report`` block — including device-authoritative peaks, ZC
|
||
Freq, Peak Acceleration, Peak Displacement, Time of Peak, sensor
|
||
self-check results, and monitor-log timestamps. The daemon-
|
||
forwarded ACH workflow should always send both files together
|
||
so the SFM database has the rich metadata for sort/filter/report.
|
||
|
||
Pairing is by exact filename match (case-insensitive on the
|
||
extension): a binary named ``foo.AB0`` is paired with a report
|
||
named ``foo.AB0.TXT`` or ``foo.AB0.txt``.
|
||
|
||
Response includes per-file outcomes so the caller can see which
|
||
landed cleanly and which failed (e.g. malformed file, unknown
|
||
serial, etc.).
|
||
"""
|
||
store = _get_store()
|
||
db = _get_db()
|
||
results: list[dict] = []
|
||
|
||
# Read every upload up front (UploadFile.read() is one-shot under
|
||
# FastAPI's spooled-tempfile backing) and split into binaries vs
|
||
# paired ASCII reports.
|
||
binaries: list[tuple[str, bytes]] = []
|
||
reports: dict[str, bytes] = {} # keyed by lower-cased stem (without .txt)
|
||
for upload in files:
|
||
name = upload.filename or ""
|
||
try:
|
||
content = await upload.read()
|
||
except Exception as exc:
|
||
results.append({
|
||
"filename": name or "<unnamed>", "status": "error",
|
||
"detail": f"read failed: {exc}",
|
||
})
|
||
continue
|
||
|
||
if name.lower().endswith(".txt"):
|
||
# Pair the report back to its binary. BW writes ASCII
|
||
# reports under two conventions:
|
||
#
|
||
# 1. ACH convention (Blastware's official Auto Call Home):
|
||
# binary: M529LK44.AB0
|
||
# report: M529LK44_AB0_ASCII.TXT
|
||
# (replaces the "." with "_", appends "_ASCII.TXT")
|
||
#
|
||
# 2. Manual-export convention (operator clicks Save As Text
|
||
# in BW's UI):
|
||
# binary: M529LK44.AB0
|
||
# report: M529LK44.AB0.TXT
|
||
# (literal binary filename + ".TXT" suffix)
|
||
#
|
||
# We register BOTH possible binary names as keys so the
|
||
# subsequent lookup matches whichever convention was used.
|
||
stripped = name[:-4] # remove ".TXT"
|
||
# ACH convention: strip "_ASCII" and convert the last "_"
|
||
# back to "." to recover the binary's filename.
|
||
if stripped.lower().endswith("_ascii"):
|
||
inner = stripped[:-6] # remove "_ASCII"
|
||
under = inner.rfind("_")
|
||
if under >= 0:
|
||
ach_binary = inner[:under] + "." + inner[under + 1 :]
|
||
reports[ach_binary.lower()] = content
|
||
# Legacy convention: the stripped name IS the binary's name.
|
||
reports[stripped.lower()] = content
|
||
else:
|
||
binaries.append((name, content))
|
||
|
||
for filename, content in binaries:
|
||
report_bytes = reports.get(filename.lower())
|
||
try:
|
||
ev, rec = store.save_imported_bw(
|
||
content,
|
||
source_path=Path(filename or "imported.bw"),
|
||
serial_hint=serial,
|
||
bw_report_text=report_bytes,
|
||
)
|
||
# WaveformStore decoded the serial from the BW filename
|
||
# (e.g. T104… → BE18104) and surfaces it on `rec`. Use that
|
||
# rather than the placeholder `_serial_from_event(ev)` stub,
|
||
# which always returned None and was silently bucketing every
|
||
# forwarded event into serial="UNKNOWN" in the DB.
|
||
resolved_serial = (
|
||
serial
|
||
or rec.get("serial")
|
||
or _serial_from_event(ev)
|
||
or "UNKNOWN"
|
||
)
|
||
inserted, skipped = db.insert_events(
|
||
[ev],
|
||
serial=resolved_serial,
|
||
waveform_records={
|
||
ev._waveform_key.hex(): rec
|
||
if ev._waveform_key else None
|
||
} if ev._waveform_key else None,
|
||
)
|
||
results.append({
|
||
"filename": filename,
|
||
"status": "ok",
|
||
"stored_filename": rec["filename"],
|
||
"filesize": rec["filesize"],
|
||
"sha256": rec["sha256"],
|
||
"serial": resolved_serial,
|
||
"report_attached": report_bytes is not None,
|
||
"inserted": inserted,
|
||
"skipped": skipped,
|
||
})
|
||
except Exception as exc:
|
||
log.error("import failed for %s: %s", filename, exc, exc_info=True)
|
||
results.append({
|
||
"filename": filename, "status": "error",
|
||
"detail": str(exc),
|
||
})
|
||
|
||
# Surface unmatched .txt uploads so the daemon can detect mis-pairings.
|
||
used_report_keys = {fn.lower() for fn, _ in binaries}
|
||
for stem in reports.keys() - used_report_keys:
|
||
results.append({
|
||
"filename": stem + ".txt",
|
||
"status": "warning",
|
||
"detail": "BW ASCII report supplied but no matching binary in this upload",
|
||
})
|
||
|
||
return {"count": len(results), "results": results}
|
||
|
||
|
||
def _serial_from_event(ev) -> Optional[str]:
|
||
"""Fallback serial resolver — currently relies on the BW filename
|
||
decoder via WaveformStore.save_imported_bw, so this is just a
|
||
placeholder for future enhancement (e.g. inferring from project_info)."""
|
||
return None
|
||
|
||
|
||
@app.get("/db/units/{serial}/waveforms.zip")
|
||
def db_unit_waveforms_zip(
|
||
serial: str,
|
||
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
|
||
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
|
||
limit: int = Query(5000, description="Hard cap on events bundled (default 5000)"),
|
||
) -> StreamingResponse:
|
||
"""
|
||
Stream a ZIP of all event files for a serial in the optional date range.
|
||
Events without a stored event file are silently skipped.
|
||
"""
|
||
import io
|
||
import zipfile
|
||
|
||
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
|
||
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
|
||
|
||
rows = _get_db().query_events(
|
||
serial=serial,
|
||
from_dt=from_parsed,
|
||
to_dt=to_parsed,
|
||
limit=limit,
|
||
offset=0,
|
||
)
|
||
store = _get_store()
|
||
|
||
buf = io.BytesIO()
|
||
written = 0
|
||
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||
for row in rows:
|
||
fn = row.get("blastware_filename")
|
||
if not fn:
|
||
continue
|
||
bw_path = store.open_blastware(serial, fn)
|
||
if bw_path is None:
|
||
continue
|
||
zf.write(bw_path, arcname=fn)
|
||
written += 1
|
||
|
||
if written == 0:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"No stored Blastware files found for serial {serial} in range",
|
||
)
|
||
|
||
buf.seek(0)
|
||
safe_serial = serial.replace("/", "_")
|
||
headers = {
|
||
"Content-Disposition": f'attachment; filename="{safe_serial}_waveforms.zip"',
|
||
"X-Waveform-Count": str(written),
|
||
}
|
||
return StreamingResponse(buf, media_type="application/zip", headers=headers)
|
||
|
||
|
||
@app.get("/db/monitor_log")
|
||
def db_monitor_log(
|
||
serial: Optional[str] = Query(None, description="Filter by unit serial"),
|
||
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
|
||
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
|
||
limit: int = Query(500, description="Max rows to return"),
|
||
offset: int = Query(0, description="Pagination offset"),
|
||
) -> dict:
|
||
"""
|
||
Query monitor log entries (continuous monitoring intervals) from the DB.
|
||
|
||
Returns entries newest-first.
|
||
"""
|
||
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
|
||
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
|
||
|
||
rows = _get_db().query_monitor_log(
|
||
serial=serial,
|
||
from_dt=from_parsed,
|
||
to_dt=to_parsed,
|
||
limit=limit,
|
||
offset=offset,
|
||
)
|
||
return {"count": len(rows), "entries": rows}
|
||
|
||
|
||
@app.get("/db/sessions")
|
||
def db_sessions(
|
||
serial: Optional[str] = Query(None, description="Filter by unit serial"),
|
||
limit: int = Query(50, description="Max rows to return"),
|
||
) -> dict:
|
||
"""
|
||
Query ACH call-home sessions from the DB, newest first.
|
||
"""
|
||
rows = _get_db().get_sessions(serial=serial, limit=limit)
|
||
return {"count": len(rows), "sessions": rows}
|
||
|
||
|
||
# ── Entry point ────────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
ap = argparse.ArgumentParser(description="SFM — Seismograph Field Module API server")
|
||
ap.add_argument("--host", default="0.0.0.0", help="Bind address (default: 0.0.0.0)")
|
||
ap.add_argument("--port", type=int, default=8200, help="Port (default: 8200)")
|
||
ap.add_argument("--reload", action="store_true", help="Enable auto-reload (dev mode)")
|
||
args = ap.parse_args()
|
||
|
||
log.info("Starting SFM server on %s:%d", args.host, args.port)
|
||
uvicorn.run(
|
||
"sfm.server:app",
|
||
host=args.host,
|
||
port=args.port,
|
||
reload=args.reload,
|
||
)
|