Add intelligent caching layer for SFM device data

Introduces sfm/cache.py — a SQLite-backed cache (via SQLAlchemy) that
sits between the SFM REST endpoints and the device, eliminating redundant
cellular downloads for data that doesn't change.

Cache behaviour by data type:
- Device info / compliance config: cached until a config write occurs;
  POST /device/config now calls mark_config_dirty() to force a fresh read
  on the next /device/info call.
- Event headers + peak values: cached permanently (append-only). On
  subsequent calls to /device/events, the server does a fast count_events()
  (~2s) instead of a full download (~10-30s); only new events are fetched
  from the device and merged into the cache.
- Full waveforms (raw ADC samples): cached permanently — immutable once
  recorded. Repeated requests for the same waveform return instantly with
  zero device contact.
- Monitor status (battery, memory, is_monitoring): 30-second TTL; auto-
  invalidated on start/stop monitoring commands.

All endpoints gain a ?force=true param to bypass the cache when needed.
New endpoints: GET /cache/stats, DELETE /cache/device.
Adds requirements.txt listing fastapi, uvicorn, sqlalchemy, pyserial.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 07:14:51 +00:00
parent 990cb8850e
commit 2db565ff9c
3 changed files with 643 additions and 32 deletions
+263 -32
View File
@@ -58,6 +58,7 @@ from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from sfm.cache import SFMCache, get_cache
logging.basicConfig(
level=logging.INFO,
@@ -239,6 +240,33 @@ def _run_with_retry(fn, *, is_tcp: bool):
return fn() # let any second failure propagate normally
# ── Helpers ────────────────────────────────────────────────────────────────────
def _backfill_events(events: list, info: "DeviceInfo") -> None:
"""
Fill in sample_rate and project_info fields that the per-event waveform
record doesn't carry — sourced from the device's compliance config.
Extracted from device_events() so it can be called from both the full
download path and the partial (new-events-only) path.
"""
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
# ── Endpoints ──────────────────────────────────────────────────────────────────
@app.get("/health")
@@ -259,6 +287,7 @@ def device_info(
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Connect to the device, perform the POLL startup handshake, and return
@@ -266,8 +295,23 @@ def device_info(
Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect — provided as GET for convenience.
**Caching**: device identity and compliance config are cached after the first
successful read (they rarely change). Pass *force=true* to bypass the cache
and re-read directly from the device (e.g. after a config push).
The cache is also automatically invalidated after POST /device/config.
"""
log.info("GET /device/info port=%s host=%s tcp_port=%d", port, host, tcp_port)
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_device_info(conn_key)
if cached is not None:
log.info("device info cache hit for %s", conn_key)
cached["_cached"] = True
return cached
try:
def _do():
@@ -287,7 +331,9 @@ def device_info(
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
return _serialise_device_info(info)
result = _serialise_device_info(info)
cache.set_device_info(conn_key, result)
return result
@app.post("/device/connect")
@@ -311,6 +357,7 @@ def device_events(
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download all events from device"),
) -> dict:
"""
Connect to the device, read the event index, and download all stored
@@ -322,10 +369,108 @@ def device_events(
verifying field offsets against the protocol reference.
This does NOT download raw ADC waveform samples — those are large and
fetched separately via GET /device/event/{idx}/waveform (future endpoint).
"""
log.info("GET /device/events port=%s host=%s debug=%s", port, host, debug)
fetched separately via GET /device/event/{idx}/waveform.
**Caching**: event headers are cached after the first download. On subsequent
calls, the device is contacted only to check the event count (fast: ~2s).
If the count matches the cache, all events are returned from cache instantly.
If new events exist on the device, only the new ones are downloaded and merged.
Pass *force=true* to bypass the cache entirely and re-download everything.
"""
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
# ── Smart cache path (skip when debug=True or force=True) ────────────────
# debug mode uses raw_record_hex which isn't stored in the cache, so we
# must always go to the device when debug is requested.
if not force and not debug:
cached_events = cache.get_all_events(conn_key)
cached_count = len(cached_events) if cached_events else 0
if cached_count > 0:
# Quick device contact: just count events via the fast 1E/1F chain.
# This takes ~2s instead of the full event download (~10-30s).
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
except HTTPException:
raise
except (ProtocolError, OSError, Exception) as exc:
# If we can't reach the device at all, serve stale cache rather
# than returning an error — field units go offline regularly.
log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
"_stale": True,
}
if device_count == cached_count:
# Nothing new — return cache immediately, no event download needed.
log.info(
"event cache hit for %s: %d events, device count matches",
conn_key, cached_count,
)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
}
if device_count > cached_count:
# New events on the device — download all events but only store/return
# the new ones. Events are append-only; indices 0..(cached_count-1)
# are already in the cache and don't need to be re-downloaded logically,
# but the protocol requires iterating from event 0 to reach later ones.
# The device download time is dominated by the number of events requested,
# so we stop at the last known event index to avoid re-downloading everything.
log.info(
"new events on device %s: have %d, device has %d — fetching all up to %d",
conn_key, cached_count, device_count, device_count - 1,
)
try:
def _fetch_new():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
all_evs = client.get_events(stop_after_index=device_count - 1)
return info, all_evs
info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
_backfill_events(all_events, info)
# Only the new events (indices >= cached_count) are truly new.
new_events = [ev for ev in all_events if ev.index >= cached_count]
new_serialised = [_serialise_event(ev) for ev in new_events]
cache.set_events(conn_key, new_serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
merged_events = cache.get_all_events(conn_key)
return {
"device": _serialise_device_info(info),
"event_count": len(merged_events),
"events": merged_events,
"_cached": True,
"_new_events": len(new_events),
}
# ── Full download path (first call, force=True, or debug=True) ───────────
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
@@ -340,31 +485,19 @@ def device_events(
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Fill sample_rate from compliance config where the event record doesn't supply it.
# sample_rate is a device-level setting, not stored per-event in the waveform record.
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
_backfill_events(events, info)
serialised = [_serialise_event(ev, debug=debug) for ev in events]
# Backfill event.project_info fields that the 210-byte waveform record doesn't carry.
# The waveform record only stores "Project:" — client/operator/sensor_location/notes
# live in the SUB 1A compliance config, not in the per-event record.
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
if not debug:
# Only cache when not in debug mode (debug adds raw_record_hex which
# we don't want polluting the normal cache entries).
cache.set_events(conn_key, serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
return {
"device": _serialise_device_info(info),
"event_count": len(events),
"events": [_serialise_event(ev, debug=debug) for ev in events],
"events": serialised,
}
@@ -375,21 +508,36 @@ def device_event(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download a single event by index (0-based).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup → event index → event header → waveform record.
**Caching**: if this event was already downloaded (e.g. via GET /device/events),
it is returned instantly from cache with no device contact.
"""
log.info("GET /device/event/%d port=%s host=%s", index, port, host)
log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_event(conn_key, index)
if cached is not None:
log.info("event cache hit for %s index %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.get_events(stop_after_index=index)
events = _run_with_retry(_do, is_tcp=_is_tcp(host))
info = client.connect()
events = client.get_events(stop_after_index=index)
return info, events
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
@@ -406,7 +554,14 @@ def device_event(
detail=f"Event index {index} not found on device",
)
return _serialise_event(matching[0])
_backfill_events(matching, info)
result = _serialise_event(matching[0])
# Store all downloaded events (we paid for them anyway — indices 0..index)
all_serialised = [_serialise_event(ev) for ev in events]
cache.set_events(conn_key, all_serialised)
return result
@app.get("/device/event/{index}/waveform")
@@ -416,6 +571,7 @@ def device_event_waveform(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download the full raw ADC waveform for a single event (0-based index).
@@ -434,8 +590,23 @@ def device_event_waveform(
- **sample_rate**: samples per second (from compliance config)
- **channels**: dict of channel name → list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic")
**Caching**: full waveforms are cached permanently after the first download —
they are immutable once recorded on the device. Subsequent requests for the
same event return instantly from cache without any device contact.
Pass *force=true* to force a fresh download (rarely needed).
"""
log.info("GET /device/event/%d/waveform port=%s host=%s", index, port, host)
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_waveform(conn_key, index)
if cached is not None:
log.info("waveform cache hit for %s event %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
@@ -469,7 +640,7 @@ def device_event_waveform(
if sample_rate is None and info.compliance_config:
sample_rate = info.compliance_config.sample_rate
return {
result = {
"index": ev.index,
"record_type": ev.record_type,
"timestamp": _serialise_timestamp(ev.timestamp),
@@ -481,6 +652,8 @@ def device_event_waveform(
"peak_values": _serialise_peak_values(ev.peak_values),
"channels": raw,
}
cache.set_waveform(conn_key, index, result)
return result
# ── Write endpoints ───────────────────────────────────────────────────────────
@@ -595,6 +768,10 @@ def device_config(
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Config was written to the device — the cached compliance config is now stale.
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().mark_config_dirty(conn_key)
return {
"status": "ok",
"updated_fields": changed,
@@ -622,6 +799,7 @@ def device_monitor_status(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Read monitoring status from the device.
@@ -633,7 +811,20 @@ def device_monitor_status(
Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle.
**Caching**: status is cached for 30 seconds to reduce cellular polling overhead.
Pass *force=true* to bypass the cache for an immediate fresh read.
"""
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_monitor_status(conn_key)
if cached is not None:
log.debug("monitor status cache hit for %s", conn_key)
cached["_cached"] = True
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
@@ -651,6 +842,8 @@ def device_monitor_status(
if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
cache.set_monitor_status(conn_key, result)
return result
@@ -673,6 +866,9 @@ def device_monitor_start(
log.warning("start monitoring poll retry: %s", exc)
client.poll()
client.start_monitoring()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "started"}
@@ -695,9 +891,44 @@ def device_monitor_stop(
log.warning("stop monitoring poll retry: %s", exc)
client.poll()
client.stop_monitoring()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "stopped"}
# ── Cache management endpoints ────────────────────────────────────────────────
@app.get("/cache/stats")
def cache_stats() -> dict:
"""
Return row counts for all cache tables.
Useful for debugging and verifying that caching is working as expected.
"""
return get_cache().stats()
@app.delete("/cache/device")
def cache_clear_device(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Clear all cached data for a specific device (identified by its connection address).
Clears: device info, all event headers, all waveforms, monitor status.
The next request to any endpoint for this device will re-fetch from the device.
Supply either *port* (serial) or *host* (TCP/modem) to identify the device.
"""
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
counts = get_cache().clear_device(conn_key)
return {"status": "cleared", "conn_key": conn_key, "deleted": counts}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":