diff --git a/CHANGELOG.md b/CHANGELOG.md index 315f3aa..defc04f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,41 @@ All notable changes to seismo-relay are documented here. --- +## v0.12.0 — 2026-04-13 + +### Added + +- **`sfm/server.py` — `_LiveCache`** — in-memory live device cache, eliminating + redundant TCP round-trips between requests. No extra dependencies (plain Python + dict + threading.Lock). Replaces the SQLAlchemy-based `sfm/cache.py` experiment + from the `feature/intelligent-caching` branch. + + Cache behaviour by endpoint: + + | Endpoint | Cache strategy | + |---|---| + | `GET /device/info` | Indefinite; invalidated by `POST /device/config` | + | `GET /device/events` | Count-probe fast path: quick `poll()+count_events()` (~2s); return cache if count matches; full download only when new events detected | + | `GET /device/monitor/status` | 30-second TTL; invalidated by monitor start/stop | + | `GET /device/event/{idx}/waveform` | Permanent per-index (waveforms are immutable) | + +- **`?force=true` param** on all four cached endpoints — bypasses cache and re-reads + from device. + +- **`POST /device/config` cache invalidation** — marks device info + events dirty so + the next read reflects the new compliance config. + +- **`POST /device/monitor/start` / `stop` cache invalidation** — evicts the monitor + status cache entry immediately so the next poll returns the updated state. + +### Removed + +- `sfm/cache.py` — SQLAlchemy-based cache from the experimental caching branch. + Its logic has been ported to the sqlite3-native `_LiveCache` class above. + `sqlalchemy` is no longer a dependency. + +--- + ## v0.11.0 — 2026-04-13 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index f1a2836..710fa6e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,7 +2,7 @@ Ground-up Python replacement for **Blastware**, Instantel's Windows-only software for managing MiniMate Plus seismographs. Connects over direct RS-232 or cellular modem -(Sierra Wireless RV50 / RV55). Current version: **v0.10.0**. +(Sierra Wireless RV50 / RV55). Current version: **v0.12.0**. --- diff --git a/sfm/server.py b/sfm/server.py index db0e1a7..58e49a9 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -37,6 +37,8 @@ from __future__ import annotations import datetime import logging import sys +import threading +import time from pathlib import Path from typing import Optional @@ -105,6 +107,136 @@ def _get_db() -> SeismoDb: return _db +# ── Live device cache ───────────────────────────────────────────────────────── +# In-memory cache for live device data. Avoids re-dialing the device on every +# request when the data hasn't changed. +# +# Keyed by conn_key ("tcp:host:port" or "serial:port:baud"). +# Does NOT persist across server restarts — this is purely an in-process cache +# to reduce TCP round-trips and cellular data usage. +# +# Invalidation rules: +# device_info — cached until POST /device/config marks it dirty +# events — cached by (conn_key, device_event_count); re-fetched when +# a quick count_events() probe shows new events on the device +# monitor_status — 30-second TTL (changes frequently during monitoring) +# waveforms — permanent (immutable once recorded; indexed by conn_key+idx) +# +# All endpoints accept ?force=true to bypass the cache and re-read from device. + +_MONITOR_STATUS_TTL = 30.0 # seconds + + +class _LiveCache: + """ + Thread-safe in-memory cache for live SFM device data. + One singleton per server process. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + # conn_key → serialised device info dict + self._device_info: dict[str, dict] = {} + # conn_key → (device_event_count_when_cached, [event dicts]) + self._events: dict[str, tuple[int, list]] = {} + # conn_key → (fetched_at_unix, status_dict) + self._monitor_status: dict[str, tuple[float, dict]] = {} + # conn_key → bool (True = re-read device on next /device/info) + self._config_dirty: dict[str, bool] = {} + # (conn_key, event_index) → waveform dict (permanent) + self._waveforms: dict[tuple, dict] = {} + + # ── Connection key ──────────────────────────────────────────────────────── + + @staticmethod + def make_conn_key( + host: Optional[str], + tcp_port: int, + port: Optional[str], + baud: int, + ) -> str: + if host: + return f"tcp:{host}:{tcp_port}" + return f"serial:{port}:{baud}" + + # ── Device info ─────────────────────────────────────────────────────────── + + def get_device_info(self, conn_key: str) -> Optional[dict]: + with self._lock: + if self._config_dirty.get(conn_key): + return None + return self._device_info.get(conn_key) + + def set_device_info(self, conn_key: str, info: dict) -> None: + with self._lock: + self._device_info[conn_key] = info + self._config_dirty[conn_key] = False + + # ── Events ──────────────────────────────────────────────────────────────── + + def get_events(self, conn_key: str, device_count: int) -> Optional[list]: + """ + Return cached events if the device's current event count matches what + we had when we last fetched. Returns None (cache miss) otherwise. + """ + with self._lock: + if self._config_dirty.get(conn_key): + return None + entry = self._events.get(conn_key) + if entry is None: + return None + cached_count, events = entry + return events if cached_count == device_count else None + + def set_events(self, conn_key: str, device_count: int, events: list) -> None: + with self._lock: + self._events[conn_key] = (device_count, events) + + # ── Monitor status ──────────────────────────────────────────────────────── + + def get_monitor_status(self, conn_key: str) -> Optional[dict]: + with self._lock: + entry = self._monitor_status.get(conn_key) + if entry is None: + return None + fetched_at, status = entry + if time.time() - fetched_at > _MONITOR_STATUS_TTL: + return None + return status + + def set_monitor_status(self, conn_key: str, status: dict) -> None: + with self._lock: + self._monitor_status[conn_key] = (time.time(), status) + + def invalidate_monitor_status(self, conn_key: str) -> None: + with self._lock: + self._monitor_status.pop(conn_key, None) + + # ── Config dirty flag ───────────────────────────────────────────────────── + + def mark_config_dirty(self, conn_key: str) -> None: + """ + Called after a successful POST /device/config write. + Forces next /device/info and /device/events to re-read from the device. + """ + with self._lock: + self._config_dirty[conn_key] = True + self._events.pop(conn_key, None) + + # ── Waveforms (permanent cache) ─────────────────────────────────────────── + + def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: + with self._lock: + return self._waveforms.get((conn_key, index)) + + def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: + with self._lock: + self._waveforms[(conn_key, index)] = waveform + + +_live_cache = _LiveCache() + + # ── Serialisers ──────────────────────────────────────────────────────────────── # Plain dict helpers — avoids a Pydantic dependency in the library layer. @@ -272,10 +404,11 @@ def webapp(): @app.get("/device/info") def device_info( - port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"), - baud: int = Query(38400, description="Serial baud rate (default 38400)"), - host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"), + port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"), + baud: int = Query(38400, description="Serial baud rate (default 38400)"), + host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Connect to the device, perform the POLL startup handshake, and return @@ -283,8 +416,18 @@ def device_info( Supply either *port* (serial) or *host* (TCP/modem). Equivalent to POST /device/connect — provided as GET for convenience. + + Response is cached until a POST /device/config write invalidates it. + Pass ?force=true to bypass the cache. """ - log.info("GET /device/info port=%s host=%s tcp_port=%d", port, host, tcp_port) + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) + log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force) + + if not force: + cached = _live_cache.get_device_info(conn_key) + if cached is not None: + log.debug("device_info cache hit for %s", conn_key) + return cached try: def _do(): @@ -304,7 +447,9 @@ def device_info( except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc - return _serialise_device_info(info) + result = _serialise_device_info(info) + _live_cache.set_device_info(conn_key, result) + return result @app.post("/device/connect") @@ -323,11 +468,12 @@ def device_connect( @app.get("/device/events") def device_events( - port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), - baud: int = Query(38400, description="Serial baud rate"), - host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), + port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), + baud: int = Query(38400, description="Serial baud rate"), + host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), - debug: bool = Query(False, description="Include raw record hex for field-layout inspection"), + debug: bool = Query(False, description="Include raw record hex for field-layout inspection"), + force: bool = Query(False, description="Bypass cache and re-download from device"), ) -> dict: """ Connect to the device, read the event index, and download all stored @@ -335,14 +481,48 @@ def device_events( Supply either *port* (serial) or *host* (TCP/modem). + **Caching:** a quick count_events() probe (~2s) is performed first. If the + device's event count matches the cached count, the cached response is returned + immediately without a full download. Pass ?force=true to skip this and always + re-download. + Pass debug=true to include raw_record_hex in each event — useful for verifying field offsets against the protocol reference. This does NOT download raw ADC waveform samples — those are large and - fetched separately via GET /device/event/{idx}/waveform (future endpoint). + fetched separately via GET /device/event/{idx}/waveform. """ - log.info("GET /device/events port=%s host=%s debug=%s", port, host, debug) + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) + log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force) + # ── Cache fast path ─────────────────────────────────────────────────────── + # Do a quick poll + count_events() probe (~2s over cellular) to check if the + # device has any new events. If the count matches the cache, return early. + if not force and not debug: + try: + def _count(): + with _build_client(port, baud, host, tcp_port) as client: + try: + client.poll() + except Exception: + client.poll() + return client.count_events() + device_count = _run_with_retry(_count, is_tcp=_is_tcp(host)) + cached_events = _live_cache.get_events(conn_key, device_count) + if cached_events is not None: + log.info(" events cache hit (%d events, count=%d)", len(cached_events), device_count) + # Also serve cached device info if available + cached_info = _live_cache.get_device_info(conn_key) + return { + "device": cached_info or {}, + "event_count": len(cached_events), + "events": cached_events, + "cached": True, + } + except Exception as exc: + log.warning(" count probe failed (%s) — falling through to full download", exc) + + # ── Full download ───────────────────────────────────────────────────────── try: def _do(): with _build_client(port, baud, host, tcp_port) as client: @@ -358,15 +538,12 @@ def device_events( raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc # Fill sample_rate from compliance config where the event record doesn't supply it. - # sample_rate is a device-level setting, not stored per-event in the waveform record. if info.compliance_config and info.compliance_config.sample_rate: for ev in events: if ev.sample_rate is None: ev.sample_rate = info.compliance_config.sample_rate # Backfill event.project_info fields that the 210-byte waveform record doesn't carry. - # The waveform record only stores "Project:" — client/operator/sensor_location/notes - # live in the SUB 1A compliance config, not in the per-event record. if info.compliance_config: cc = info.compliance_config for ev in events: @@ -378,10 +555,19 @@ def device_events( if pi.sensor_location is None: pi.sensor_location = cc.sensor_location if pi.notes is None: pi.notes = cc.notes + serialised_info = _serialise_device_info(info) + serialised_events = [_serialise_event(ev, debug=debug) for ev in events] + + # Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache) + if not debug: + _live_cache.set_device_info(conn_key, serialised_info) + _live_cache.set_events(conn_key, len(events), serialised_events) + return { - "device": _serialise_device_info(info), + "device": serialised_info, "event_count": len(events), - "events": [_serialise_event(ev, debug=debug) for ev in events], + "events": serialised_events, + "cached": False, } @@ -429,10 +615,11 @@ def device_event( @app.get("/device/event/{index}/waveform") def device_event_waveform( index: int, - port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), - baud: int = Query(38400, description="Serial baud rate"), - host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), + port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), + baud: int = Query(38400, description="Serial baud rate"), + host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-download waveform"), ) -> dict: """ Download the full raw ADC waveform for a single event (0-based index). @@ -451,8 +638,18 @@ def device_event_waveform( - **sample_rate**: samples per second (from compliance config) - **channels**: dict of channel name → list of signed int16 ADC counts (keys: "Tran", "Vert", "Long", "Mic") + + Waveforms are immutable once recorded and are cached permanently per + (connection, event index). Pass ?force=true to re-download. """ - log.info("GET /device/event/%d/waveform port=%s host=%s", index, port, host) + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) + log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force) + + if not force: + cached_waveform = _live_cache.get_waveform(conn_key, index) + if cached_waveform is not None: + log.debug("waveform cache hit: %s event %d", conn_key, index) + return cached_waveform try: def _do(): @@ -486,7 +683,7 @@ def device_event_waveform( if sample_rate is None and info.compliance_config: sample_rate = info.compliance_config.sample_rate - return { + result = { "index": ev.index, "record_type": ev.record_type, "timestamp": _serialise_timestamp(ev.timestamp), @@ -498,6 +695,8 @@ def device_event_waveform( "peak_values": _serialise_peak_values(ev.peak_values), "channels": raw, } + _live_cache.set_waveform(conn_key, index, result) + return result # ── Write endpoints ─────────────────────────────────────────────────────────── @@ -582,6 +781,7 @@ def device_config( 422 if neither port nor host is provided. """ changed = body.model_dump(exclude_none=True) + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys())) try: @@ -612,6 +812,10 @@ def device_config( except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc + # Config was written — invalidate cached device info and events so the next + # /device/info or /device/events call re-reads fresh data from the device. + _live_cache.mark_config_dirty(conn_key) + return { "status": "ok", "updated_fields": changed, @@ -639,6 +843,7 @@ def device_monitor_status( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Read monitoring status from the device. @@ -650,7 +855,17 @@ def device_monitor_status( Returns is_monitoring bool, battery voltage, and memory usage (total + free bytes). Battery and memory are only present when the unit is idle. + + **Caching:** response is cached for 30 seconds. Pass ?force=true to bypass. """ + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) + + if not force: + cached = _live_cache.get_monitor_status(conn_key) + if cached is not None: + log.debug("monitor_status cache hit for %s", conn_key) + return cached + with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() @@ -668,6 +883,8 @@ def device_monitor_status( if status.memory_free is not None: result["memory_free_bytes"] = status.memory_free result["memory_free_kb"] = round(status.memory_free / 1024, 1) + + _live_cache.set_monitor_status(conn_key, result) return result @@ -683,6 +900,7 @@ def device_monitor_start( Sends SUB 0x96 and waits for ack SUB 0x69. """ + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() @@ -690,6 +908,7 @@ def device_monitor_start( log.warning("start monitoring poll retry: %s", exc) client.poll() client.start_monitoring() + _live_cache.invalidate_monitor_status(conn_key) return {"status": "started"} @@ -705,6 +924,7 @@ def device_monitor_stop( Sends SUB 0x97 and waits for ack SUB 0x68. """ + conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud) with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() @@ -712,6 +932,7 @@ def device_monitor_stop( log.warning("stop monitoring poll retry: %s", exc) client.poll() client.stop_monitoring() + _live_cache.invalidate_monitor_status(conn_key) return {"status": "stopped"}