Files
seismo-relay/sfm/cache.py
T
serversdown 2db565ff9c Add intelligent caching layer for SFM device data
Introduces sfm/cache.py — a SQLite-backed cache (via SQLAlchemy) that
sits between the SFM REST endpoints and the device, eliminating redundant
cellular downloads for data that doesn't change.

Cache behaviour by data type:
- Device info / compliance config: cached until a config write occurs;
  POST /device/config now calls mark_config_dirty() to force a fresh read
  on the next /device/info call.
- Event headers + peak values: cached permanently (append-only). On
  subsequent calls to /device/events, the server does a fast count_events()
  (~2s) instead of a full download (~10-30s); only new events are fetched
  from the device and merged into the cache.
- Full waveforms (raw ADC samples): cached permanently — immutable once
  recorded. Repeated requests for the same waveform return instantly with
  zero device contact.
- Monitor status (battery, memory, is_monitoring): 30-second TTL; auto-
  invalidated on start/stop monitoring commands.

All endpoints gain a ?force=true param to bypass the cache when needed.
New endpoints: GET /cache/stats, DELETE /cache/device.
Adds requirements.txt listing fastapi, uvicorn, sqlalchemy, pyserial.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 07:14:51 +00:00

377 lines
16 KiB
Python

"""
sfm/cache.py — Persistent SQLite cache for SFM device data.
Caching strategy
----------------
+------------------+----------------------------------+-------------------------+
| Data | Mutability | Invalidation |
+------------------+----------------------------------+-------------------------+
| Device info | Effectively immutable (firmware, | Manual clear / force |
| (serial, model, | serial never change) | refresh query param |
| compliance cfg) | | |
+------------------+----------------------------------+-------------------------+
| Event headers | Append-only (new events added, | Fetch new ones when |
| (peaks, ts, | old never modified) | device event count > |
| project info) | | cached count |
+------------------+----------------------------------+-------------------------+
| Full waveforms | Immutable once recorded | Never (permanent cache) |
| (raw ADC samples)| | |
+------------------+----------------------------------+-------------------------+
| Monitor status | Frequently changing | TTL = 30 seconds |
| (battery, memory)| | |
+------------------+----------------------------------+-------------------------+
Keys
----
All cached rows are keyed by (host, tcp_port) for TCP connections, or (port, baud)
for serial connections. Within a device, events are keyed by index (0-based).
The device serial number is stored once we learn it, and used for display / debugging
only — the network address is the primary routing key (same as how the rest of the SFM
code operates).
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import Optional
try:
import sqlalchemy as sa
from sqlalchemy import orm
except ImportError:
raise ImportError(
"sqlalchemy is required for the SFM cache.\n"
"Install it with: pip install sqlalchemy"
)
log = logging.getLogger("sfm.cache")
# ── Schema ────────────────────────────────────────────────────────────────────
Base = orm.declarative_base()
_MONITOR_STATUS_TTL = 30 # seconds
class CachedDevice(Base):
"""
Device identity + compliance config, keyed by connection address.
Stores the full serialised JSON blob returned by /device/info so the
endpoint can return it verbatim on a cache hit without re-connecting.
"""
__tablename__ = "cached_devices"
# Connection key — either TCP (host+port) or serial (port+baud)
conn_key = sa.Column(sa.String, primary_key=True) # e.g. "tcp:1.2.3.4:12345"
serial = sa.Column(sa.String, nullable=True) # e.g. "BE11529"
info_json = sa.Column(sa.Text, nullable=False) # full /device/info response JSON
updated_at = sa.Column(sa.Float, nullable=False) # Unix timestamp of last write
# When a config write happens we set this flag so the next /device/info call
# fetches fresh data instead of serving stale compliance config.
config_dirty = sa.Column(sa.Boolean, default=False, nullable=False)
class CachedEvent(Base):
"""
Per-event header + peak values + project info, keyed by (conn_key, index).
Events are immutable once recorded on the device; once we have an event in
the cache it never needs to be re-downloaded unless explicitly requested.
"""
__tablename__ = "cached_events"
conn_key = sa.Column(sa.String, primary_key=True)
index = sa.Column(sa.Integer, primary_key=True)
event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict
cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp
class CachedWaveform(Base):
"""
Full raw ADC waveform for a single event (SUB 5A full download).
These are large (up to several MB) and expensive to fetch over cellular.
Once downloaded they are immutable and cached permanently.
"""
__tablename__ = "cached_waveforms"
conn_key = sa.Column(sa.String, primary_key=True)
index = sa.Column(sa.Integer, primary_key=True)
waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON
cached_at = sa.Column(sa.Float, nullable=False)
class CachedMonitorStatus(Base):
"""
Monitor status (battery, memory, is_monitoring) with a short TTL.
These change frequently during field operations so we keep them only for
MONITOR_STATUS_TTL seconds before re-fetching from the device.
"""
__tablename__ = "cached_monitor_status"
conn_key = sa.Column(sa.String, primary_key=True)
status_json = sa.Column(sa.Text, nullable=False)
cached_at = sa.Column(sa.Float, nullable=False)
# ── Cache store ───────────────────────────────────────────────────────────────
class SFMCache:
"""
SQLite-backed cache for SFM device data.
Usage
-----
cache = SFMCache() # stores in sfm/data/sfm_cache.db by default
cache = SFMCache(":memory:") # in-memory (tests / ephemeral mode)
All public methods accept a *conn_key* string — use make_conn_key() to
build a consistent key from the transport parameters.
"""
def __init__(self, db_path: str | Path | None = None) -> None:
in_memory = (db_path == ":memory:")
if db_path is None:
# Default: alongside this file in sfm/data/
db_path = Path(__file__).parent / "data" / "sfm_cache.db"
if not in_memory:
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
url = "sqlite:///:memory:" if in_memory else f"sqlite:///{db_path}"
engine = sa.create_engine(url, connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
self._Session = orm.sessionmaker(bind=engine)
log.info("SFM cache opened: %s", db_path)
# ── Connection key ────────────────────────────────────────────────────────
@staticmethod
def make_conn_key(
host: Optional[str],
tcp_port: int,
port: Optional[str],
baud: int,
) -> str:
"""Return a stable string key for this transport configuration."""
if host:
return f"tcp:{host}:{tcp_port}"
return f"serial:{port}:{baud}"
# ── Device info ───────────────────────────────────────────────────────────
def get_device_info(self, conn_key: str) -> Optional[dict]:
"""
Return cached device info dict, or None if not cached / config_dirty.
"""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
if row is None or row.config_dirty:
return None
return json.loads(row.info_json)
def set_device_info(self, conn_key: str, info: dict) -> None:
"""Store device info and clear any dirty flag."""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
serial = info.get("serial")
if row is None:
row = CachedDevice(
conn_key=conn_key,
serial=serial,
info_json=json.dumps(info),
updated_at=time.time(),
config_dirty=False,
)
s.add(row)
else:
row.serial = serial
row.info_json = json.dumps(info)
row.updated_at = time.time()
row.config_dirty = False
s.commit()
log.debug("cached device info for %s (serial=%s)", conn_key, serial)
def mark_config_dirty(self, conn_key: str) -> None:
"""
Called after a successful POST /device/config write.
Forces the next /device/info call to re-read compliance config from the
device instead of serving the now-stale cached version.
"""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
if row:
row.config_dirty = True
s.commit()
log.debug("marked config dirty for %s", conn_key)
# ── Events ────────────────────────────────────────────────────────────────
def get_cached_event_count(self, conn_key: str) -> int:
"""Return the number of events we have cached for this device."""
with self._Session() as s:
return s.query(CachedEvent).filter_by(conn_key=conn_key).count()
def get_all_events(self, conn_key: str) -> Optional[list[dict]]:
"""
Return all cached events as a list of dicts, sorted by index.
Returns None if nothing is cached yet.
"""
with self._Session() as s:
rows = (
s.query(CachedEvent)
.filter_by(conn_key=conn_key)
.order_by(CachedEvent.index)
.all()
)
if not rows:
return None
return [json.loads(r.event_json) for r in rows]
def get_event(self, conn_key: str, index: int) -> Optional[dict]:
"""Return a single cached event by index, or None if not cached."""
with self._Session() as s:
row = s.get(CachedEvent, (conn_key, index))
return json.loads(row.event_json) if row else None
def set_events(self, conn_key: str, events: list[dict]) -> None:
"""
Upsert a list of event dicts. Existing rows are updated; new rows are
inserted. This is used to add newly-discovered events to the cache.
"""
now = time.time()
with self._Session() as s:
for ev in events:
idx = ev["index"]
row = s.get(CachedEvent, (conn_key, idx))
if row is None:
row = CachedEvent(
conn_key=conn_key,
index=idx,
event_json=json.dumps(ev),
cached_at=now,
)
s.add(row)
log.debug("cached new event %d for %s", idx, conn_key)
else:
# Refresh in case project_info was backfilled after initial store
row.event_json = json.dumps(ev)
s.commit()
# ── Waveforms ─────────────────────────────────────────────────────────────
def get_waveform(self, conn_key: str, index: int) -> Optional[dict]:
"""Return a cached full waveform response dict, or None if not cached."""
with self._Session() as s:
row = s.get(CachedWaveform, (conn_key, index))
if row is None:
return None
log.debug("waveform cache hit: %s event %d", conn_key, index)
return json.loads(row.waveform_json)
def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None:
"""Store a full waveform response dict permanently."""
with self._Session() as s:
row = s.get(CachedWaveform, (conn_key, index))
if row is None:
row = CachedWaveform(
conn_key=conn_key,
index=index,
waveform_json=json.dumps(waveform),
cached_at=time.time(),
)
s.add(row)
else:
row.waveform_json = json.dumps(waveform)
row.cached_at = time.time()
s.commit()
log.debug("cached waveform for %s event %d", conn_key, index)
# ── Monitor status ────────────────────────────────────────────────────────
def get_monitor_status(self, conn_key: str) -> Optional[dict]:
"""Return cached monitor status if it's within TTL, else None."""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row is None:
return None
age = time.time() - row.cached_at
if age > _MONITOR_STATUS_TTL:
log.debug("monitor status expired (age=%.1fs) for %s", age, conn_key)
return None
return json.loads(row.status_json)
def set_monitor_status(self, conn_key: str, status: dict) -> None:
"""Store monitor status."""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row is None:
row = CachedMonitorStatus(
conn_key=conn_key,
status_json=json.dumps(status),
cached_at=time.time(),
)
s.add(row)
else:
row.status_json = json.dumps(status)
row.cached_at = time.time()
s.commit()
def invalidate_monitor_status(self, conn_key: str) -> None:
"""
Called after start/stop monitoring so the next status poll re-reads from device.
"""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row:
s.delete(row)
s.commit()
# ── Cache management ──────────────────────────────────────────────────────
def clear_device(self, conn_key: str) -> dict:
"""
Remove all cached data for a device. Returns counts of deleted rows.
"""
counts = {}
with self._Session() as s:
counts["device_info"] = s.query(CachedDevice).filter_by(conn_key=conn_key).delete()
counts["events"] = s.query(CachedEvent).filter_by(conn_key=conn_key).delete()
counts["waveforms"] = s.query(CachedWaveform).filter_by(conn_key=conn_key).delete()
counts["monitor_status"] = s.query(CachedMonitorStatus).filter_by(conn_key=conn_key).delete()
s.commit()
log.info("cleared cache for %s: %s", conn_key, counts)
return counts
def stats(self) -> dict:
"""Return row counts for all cache tables (for /cache/stats endpoint)."""
with self._Session() as s:
return {
"devices": s.query(CachedDevice).count(),
"events": s.query(CachedEvent).count(),
"waveforms": s.query(CachedWaveform).count(),
"monitor_status": s.query(CachedMonitorStatus).count(),
}
# ── Module-level singleton ────────────────────────────────────────────────────
# Instantiated once when the module is imported; shared across all requests.
_cache: Optional[SFMCache] = None
def get_cache() -> SFMCache:
"""Return the module-level cache singleton, initialising it on first call."""
global _cache
if _cache is None:
_cache = SFMCache()
return _cache