feat: v0.12.0 — live device cache (_LiveCache) in sfm/server.py

Ports the intelligent-caching branch concept to a plain Python in-memory
implementation — no SQLAlchemy, no extra DB table, no new dependencies.

_LiveCache (threading.Lock + dicts) caches:
  - device info: indefinite, invalidated by POST /device/config
  - events: keyed by (conn_key, device_event_count); count-probe fast path
    (~2s poll+count_events) avoids full downloads when nothing is new
  - monitor status: 30-second TTL, invalidated by monitor start/stop
  - waveforms: permanent per (conn_key, event_index)

All four cached endpoints accept ?force=true to bypass the cache.
Removes sfm/cache.py (SQLAlchemy experiment, now superseded).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-13 15:57:02 -04:00
parent 03d224ccc3
commit 48d7e94c02
3 changed files with 278 additions and 22 deletions
+35
View File
@@ -4,6 +4,41 @@ All notable changes to seismo-relay are documented here.
--- ---
## v0.12.0 — 2026-04-13
### Added
- **`sfm/server.py``_LiveCache`** — in-memory live device cache, eliminating
redundant TCP round-trips between requests. No extra dependencies (plain Python
dict + threading.Lock). Replaces the SQLAlchemy-based `sfm/cache.py` experiment
from the `feature/intelligent-caching` branch.
Cache behaviour by endpoint:
| Endpoint | Cache strategy |
|---|---|
| `GET /device/info` | Indefinite; invalidated by `POST /device/config` |
| `GET /device/events` | Count-probe fast path: quick `poll()+count_events()` (~2s); return cache if count matches; full download only when new events detected |
| `GET /device/monitor/status` | 30-second TTL; invalidated by monitor start/stop |
| `GET /device/event/{idx}/waveform` | Permanent per-index (waveforms are immutable) |
- **`?force=true` param** on all four cached endpoints — bypasses cache and re-reads
from device.
- **`POST /device/config` cache invalidation** — marks device info + events dirty so
the next read reflects the new compliance config.
- **`POST /device/monitor/start` / `stop` cache invalidation** — evicts the monitor
status cache entry immediately so the next poll returns the updated state.
### Removed
- `sfm/cache.py` — SQLAlchemy-based cache from the experimental caching branch.
Its logic has been ported to the sqlite3-native `_LiveCache` class above.
`sqlalchemy` is no longer a dependency.
---
## v0.11.0 — 2026-04-13 ## v0.11.0 — 2026-04-13
### Added ### Added
+1 -1
View File
@@ -2,7 +2,7 @@
Ground-up Python replacement for **Blastware**, Instantel's Windows-only software for Ground-up Python replacement for **Blastware**, Instantel's Windows-only software for
managing MiniMate Plus seismographs. Connects over direct RS-232 or cellular modem managing MiniMate Plus seismographs. Connects over direct RS-232 or cellular modem
(Sierra Wireless RV50 / RV55). Current version: **v0.10.0**. (Sierra Wireless RV50 / RV55). Current version: **v0.12.0**.
--- ---
+242 -21
View File
@@ -37,6 +37,8 @@ from __future__ import annotations
import datetime import datetime
import logging import logging
import sys import sys
import threading
import time
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
@@ -105,6 +107,136 @@ def _get_db() -> SeismoDb:
return _db return _db
# ── Live device cache ─────────────────────────────────────────────────────────
# In-memory cache for live device data. Avoids re-dialing the device on every
# request when the data hasn't changed.
#
# Keyed by conn_key ("tcp:host:port" or "serial:port:baud").
# Does NOT persist across server restarts — this is purely an in-process cache
# to reduce TCP round-trips and cellular data usage.
#
# Invalidation rules:
# device_info — cached until POST /device/config marks it dirty
# events — cached by (conn_key, device_event_count); re-fetched when
# a quick count_events() probe shows new events on the device
# monitor_status — 30-second TTL (changes frequently during monitoring)
# waveforms — permanent (immutable once recorded; indexed by conn_key+idx)
#
# All endpoints accept ?force=true to bypass the cache and re-read from device.
_MONITOR_STATUS_TTL = 30.0 # seconds
class _LiveCache:
"""
Thread-safe in-memory cache for live SFM device data.
One singleton per server process.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
# conn_key → serialised device info dict
self._device_info: dict[str, dict] = {}
# conn_key → (device_event_count_when_cached, [event dicts])
self._events: dict[str, tuple[int, list]] = {}
# conn_key → (fetched_at_unix, status_dict)
self._monitor_status: dict[str, tuple[float, dict]] = {}
# conn_key → bool (True = re-read device on next /device/info)
self._config_dirty: dict[str, bool] = {}
# (conn_key, event_index) → waveform dict (permanent)
self._waveforms: dict[tuple, dict] = {}
# ── Connection key ────────────────────────────────────────────────────────
@staticmethod
def make_conn_key(
host: Optional[str],
tcp_port: int,
port: Optional[str],
baud: int,
) -> str:
if host:
return f"tcp:{host}:{tcp_port}"
return f"serial:{port}:{baud}"
# ── Device info ───────────────────────────────────────────────────────────
def get_device_info(self, conn_key: str) -> Optional[dict]:
with self._lock:
if self._config_dirty.get(conn_key):
return None
return self._device_info.get(conn_key)
def set_device_info(self, conn_key: str, info: dict) -> None:
with self._lock:
self._device_info[conn_key] = info
self._config_dirty[conn_key] = False
# ── Events ────────────────────────────────────────────────────────────────
def get_events(self, conn_key: str, device_count: int) -> Optional[list]:
"""
Return cached events if the device's current event count matches what
we had when we last fetched. Returns None (cache miss) otherwise.
"""
with self._lock:
if self._config_dirty.get(conn_key):
return None
entry = self._events.get(conn_key)
if entry is None:
return None
cached_count, events = entry
return events if cached_count == device_count else None
def set_events(self, conn_key: str, device_count: int, events: list) -> None:
with self._lock:
self._events[conn_key] = (device_count, events)
# ── Monitor status ────────────────────────────────────────────────────────
def get_monitor_status(self, conn_key: str) -> Optional[dict]:
with self._lock:
entry = self._monitor_status.get(conn_key)
if entry is None:
return None
fetched_at, status = entry
if time.time() - fetched_at > _MONITOR_STATUS_TTL:
return None
return status
def set_monitor_status(self, conn_key: str, status: dict) -> None:
with self._lock:
self._monitor_status[conn_key] = (time.time(), status)
def invalidate_monitor_status(self, conn_key: str) -> None:
with self._lock:
self._monitor_status.pop(conn_key, None)
# ── Config dirty flag ─────────────────────────────────────────────────────
def mark_config_dirty(self, conn_key: str) -> None:
"""
Called after a successful POST /device/config write.
Forces next /device/info and /device/events to re-read from the device.
"""
with self._lock:
self._config_dirty[conn_key] = True
self._events.pop(conn_key, None)
# ── Waveforms (permanent cache) ───────────────────────────────────────────
def get_waveform(self, conn_key: str, index: int) -> Optional[dict]:
with self._lock:
return self._waveforms.get((conn_key, index))
def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None:
with self._lock:
self._waveforms[(conn_key, index)] = waveform
_live_cache = _LiveCache()
# ── Serialisers ──────────────────────────────────────────────────────────────── # ── Serialisers ────────────────────────────────────────────────────────────────
# Plain dict helpers — avoids a Pydantic dependency in the library layer. # Plain dict helpers — avoids a Pydantic dependency in the library layer.
@@ -272,10 +404,11 @@ def webapp():
@app.get("/device/info") @app.get("/device/info")
def device_info( def device_info(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"), port: Optional[str] = Query(None, description="Serial port (e.g. COM5, /dev/ttyUSB0)"),
baud: int = Query(38400, description="Serial baud rate (default 38400)"), baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict: ) -> dict:
""" """
Connect to the device, perform the POLL startup handshake, and return Connect to the device, perform the POLL startup handshake, and return
@@ -283,8 +416,18 @@ def device_info(
Supply either *port* (serial) or *host* (TCP/modem). Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect — provided as GET for convenience. Equivalent to POST /device/connect — provided as GET for convenience.
Response is cached until a POST /device/config write invalidates it.
Pass ?force=true to bypass the cache.
""" """
log.info("GET /device/info port=%s host=%s tcp_port=%d", port, host, tcp_port) conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
if not force:
cached = _live_cache.get_device_info(conn_key)
if cached is not None:
log.debug("device_info cache hit for %s", conn_key)
return cached
try: try:
def _do(): def _do():
@@ -304,7 +447,9 @@ def device_info(
except Exception as exc: except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return _serialise_device_info(info) result = _serialise_device_info(info)
_live_cache.set_device_info(conn_key, result)
return result
@app.post("/device/connect") @app.post("/device/connect")
@@ -323,11 +468,12 @@ def device_connect(
@app.get("/device/events") @app.get("/device/events")
def device_events( def device_events(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"), baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"), debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict: ) -> dict:
""" """
Connect to the device, read the event index, and download all stored Connect to the device, read the event index, and download all stored
@@ -335,14 +481,48 @@ def device_events(
Supply either *port* (serial) or *host* (TCP/modem). Supply either *port* (serial) or *host* (TCP/modem).
**Caching:** a quick count_events() probe (~2s) is performed first. If the
device's event count matches the cached count, the cached response is returned
immediately without a full download. Pass ?force=true to skip this and always
re-download.
Pass debug=true to include raw_record_hex in each event — useful for Pass debug=true to include raw_record_hex in each event — useful for
verifying field offsets against the protocol reference. verifying field offsets against the protocol reference.
This does NOT download raw ADC waveform samples — those are large and This does NOT download raw ADC waveform samples — those are large and
fetched separately via GET /device/event/{idx}/waveform (future endpoint). fetched separately via GET /device/event/{idx}/waveform.
""" """
log.info("GET /device/events port=%s host=%s debug=%s", port, host, debug) conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
# ── Cache fast path ───────────────────────────────────────────────────────
# Do a quick poll + count_events() probe (~2s over cellular) to check if the
# device has any new events. If the count matches the cache, return early.
if not force and not debug:
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
try:
client.poll()
except Exception:
client.poll()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
cached_events = _live_cache.get_events(conn_key, device_count)
if cached_events is not None:
log.info(" events cache hit (%d events, count=%d)", len(cached_events), device_count)
# Also serve cached device info if available
cached_info = _live_cache.get_device_info(conn_key)
return {
"device": cached_info or {},
"event_count": len(cached_events),
"events": cached_events,
"cached": True,
}
except Exception as exc:
log.warning(" count probe failed (%s) — falling through to full download", exc)
# ── Full download ─────────────────────────────────────────────────────────
try: try:
def _do(): def _do():
with _build_client(port, baud, host, tcp_port) as client: with _build_client(port, baud, host, tcp_port) as client:
@@ -358,15 +538,12 @@ def device_events(
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Fill sample_rate from compliance config where the event record doesn't supply it. # Fill sample_rate from compliance config where the event record doesn't supply it.
# sample_rate is a device-level setting, not stored per-event in the waveform record.
if info.compliance_config and info.compliance_config.sample_rate: if info.compliance_config and info.compliance_config.sample_rate:
for ev in events: for ev in events:
if ev.sample_rate is None: if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate ev.sample_rate = info.compliance_config.sample_rate
# Backfill event.project_info fields that the 210-byte waveform record doesn't carry. # Backfill event.project_info fields that the 210-byte waveform record doesn't carry.
# The waveform record only stores "Project:" — client/operator/sensor_location/notes
# live in the SUB 1A compliance config, not in the per-event record.
if info.compliance_config: if info.compliance_config:
cc = info.compliance_config cc = info.compliance_config
for ev in events: for ev in events:
@@ -378,10 +555,19 @@ def device_events(
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes if pi.notes is None: pi.notes = cc.notes
serialised_info = _serialise_device_info(info)
serialised_events = [_serialise_event(ev, debug=debug) for ev in events]
# Update cache (skip if debug=True — raw hex blobs shouldn't pollute the cache)
if not debug:
_live_cache.set_device_info(conn_key, serialised_info)
_live_cache.set_events(conn_key, len(events), serialised_events)
return { return {
"device": _serialise_device_info(info), "device": serialised_info,
"event_count": len(events), "event_count": len(events),
"events": [_serialise_event(ev, debug=debug) for ev in events], "events": serialised_events,
"cached": False,
} }
@@ -429,10 +615,11 @@ def device_event(
@app.get("/device/event/{index}/waveform") @app.get("/device/event/{index}/waveform")
def device_event_waveform( def device_event_waveform(
index: int, index: int,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"), baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download waveform"),
) -> dict: ) -> dict:
""" """
Download the full raw ADC waveform for a single event (0-based index). Download the full raw ADC waveform for a single event (0-based index).
@@ -451,8 +638,18 @@ def device_event_waveform(
- **sample_rate**: samples per second (from compliance config) - **sample_rate**: samples per second (from compliance config)
- **channels**: dict of channel name → list of signed int16 ADC counts - **channels**: dict of channel name → list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic") (keys: "Tran", "Vert", "Long", "Mic")
Waveforms are immutable once recorded and are cached permanently per
(connection, event index). Pass ?force=true to re-download.
""" """
log.info("GET /device/event/%d/waveform port=%s host=%s", index, port, host) conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
if not force:
cached_waveform = _live_cache.get_waveform(conn_key, index)
if cached_waveform is not None:
log.debug("waveform cache hit: %s event %d", conn_key, index)
return cached_waveform
try: try:
def _do(): def _do():
@@ -486,7 +683,7 @@ def device_event_waveform(
if sample_rate is None and info.compliance_config: if sample_rate is None and info.compliance_config:
sample_rate = info.compliance_config.sample_rate sample_rate = info.compliance_config.sample_rate
return { result = {
"index": ev.index, "index": ev.index,
"record_type": ev.record_type, "record_type": ev.record_type,
"timestamp": _serialise_timestamp(ev.timestamp), "timestamp": _serialise_timestamp(ev.timestamp),
@@ -498,6 +695,8 @@ def device_event_waveform(
"peak_values": _serialise_peak_values(ev.peak_values), "peak_values": _serialise_peak_values(ev.peak_values),
"channels": raw, "channels": raw,
} }
_live_cache.set_waveform(conn_key, index, result)
return result
# ── Write endpoints ─────────────────────────────────────────────────────────── # ── Write endpoints ───────────────────────────────────────────────────────────
@@ -582,6 +781,7 @@ def device_config(
422 if neither port nor host is provided. 422 if neither port nor host is provided.
""" """
changed = body.model_dump(exclude_none=True) changed = body.model_dump(exclude_none=True)
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys())) log.info("POST /device/config port=%s host=%s fields=%s", port, host, list(changed.keys()))
try: try:
@@ -612,6 +812,10 @@ def device_config(
except Exception as exc: except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Config was written — invalidate cached device info and events so the next
# /device/info or /device/events call re-reads fresh data from the device.
_live_cache.mark_config_dirty(conn_key)
return { return {
"status": "ok", "status": "ok",
"updated_fields": changed, "updated_fields": changed,
@@ -639,6 +843,7 @@ def device_monitor_status(
baud: int = Query(38400, description="Serial baud rate"), baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict: ) -> dict:
""" """
Read monitoring status from the device. Read monitoring status from the device.
@@ -650,7 +855,17 @@ def device_monitor_status(
Returns is_monitoring bool, battery voltage, and memory usage (total + free Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle. bytes). Battery and memory are only present when the unit is idle.
**Caching:** response is cached for 30 seconds. Pass ?force=true to bypass.
""" """
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = _live_cache.get_monitor_status(conn_key)
if cached is not None:
log.debug("monitor_status cache hit for %s", conn_key)
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try: try:
client.poll() client.poll()
@@ -668,6 +883,8 @@ def device_monitor_status(
if status.memory_free is not None: if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1) result["memory_free_kb"] = round(status.memory_free / 1024, 1)
_live_cache.set_monitor_status(conn_key, result)
return result return result
@@ -683,6 +900,7 @@ def device_monitor_start(
Sends SUB 0x96 and waits for ack SUB 0x69. Sends SUB 0x96 and waits for ack SUB 0x69.
""" """
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try: try:
client.poll() client.poll()
@@ -690,6 +908,7 @@ def device_monitor_start(
log.warning("start monitoring poll retry: %s", exc) log.warning("start monitoring poll retry: %s", exc)
client.poll() client.poll()
client.start_monitoring() client.start_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
return {"status": "started"} return {"status": "started"}
@@ -705,6 +924,7 @@ def device_monitor_stop(
Sends SUB 0x97 and waits for ack SUB 0x68. Sends SUB 0x97 and waits for ack SUB 0x68.
""" """
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try: try:
client.poll() client.poll()
@@ -712,6 +932,7 @@ def device_monitor_stop(
log.warning("stop monitoring poll retry: %s", exc) log.warning("stop monitoring poll retry: %s", exc)
client.poll() client.poll()
client.stop_monitoring() client.stop_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
return {"status": "stopped"} return {"status": "stopped"}