From 2db565ff9c38e24f4ff7f48b9dd669d71e3b31a3 Mon Sep 17 00:00:00 2001 From: serversdown Date: Thu, 9 Apr 2026 07:14:51 +0000 Subject: [PATCH] Add intelligent caching layer for SFM device data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces sfm/cache.py — a SQLite-backed cache (via SQLAlchemy) that sits between the SFM REST endpoints and the device, eliminating redundant cellular downloads for data that doesn't change. Cache behaviour by data type: - Device info / compliance config: cached until a config write occurs; POST /device/config now calls mark_config_dirty() to force a fresh read on the next /device/info call. - Event headers + peak values: cached permanently (append-only). On subsequent calls to /device/events, the server does a fast count_events() (~2s) instead of a full download (~10-30s); only new events are fetched from the device and merged into the cache. - Full waveforms (raw ADC samples): cached permanently — immutable once recorded. Repeated requests for the same waveform return instantly with zero device contact. - Monitor status (battery, memory, is_monitoring): 30-second TTL; auto- invalidated on start/stop monitoring commands. All endpoints gain a ?force=true param to bypass the cache when needed. New endpoints: GET /cache/stats, DELETE /cache/device. Adds requirements.txt listing fastapi, uvicorn, sqlalchemy, pyserial. Co-Authored-By: Claude Sonnet 4.6 --- requirements.txt | 4 + sfm/cache.py | 376 +++++++++++++++++++++++++++++++++++++++++++++++ sfm/server.py | 295 +++++++++++++++++++++++++++++++++---- 3 files changed, 643 insertions(+), 32 deletions(-) create mode 100644 requirements.txt create mode 100644 sfm/cache.py diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0958f1a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +fastapi +uvicorn +sqlalchemy +pyserial diff --git a/sfm/cache.py b/sfm/cache.py new file mode 100644 index 0000000..be35e60 --- /dev/null +++ b/sfm/cache.py @@ -0,0 +1,376 @@ +""" +sfm/cache.py — Persistent SQLite cache for SFM device data. + +Caching strategy +---------------- ++------------------+----------------------------------+-------------------------+ +| Data | Mutability | Invalidation | ++------------------+----------------------------------+-------------------------+ +| Device info | Effectively immutable (firmware, | Manual clear / force | +| (serial, model, | serial never change) | refresh query param | +| compliance cfg) | | | ++------------------+----------------------------------+-------------------------+ +| Event headers | Append-only (new events added, | Fetch new ones when | +| (peaks, ts, | old never modified) | device event count > | +| project info) | | cached count | ++------------------+----------------------------------+-------------------------+ +| Full waveforms | Immutable once recorded | Never (permanent cache) | +| (raw ADC samples)| | | ++------------------+----------------------------------+-------------------------+ +| Monitor status | Frequently changing | TTL = 30 seconds | +| (battery, memory)| | | ++------------------+----------------------------------+-------------------------+ + +Keys +---- +All cached rows are keyed by (host, tcp_port) for TCP connections, or (port, baud) +for serial connections. Within a device, events are keyed by index (0-based). + +The device serial number is stored once we learn it, and used for display / debugging +only — the network address is the primary routing key (same as how the rest of the SFM +code operates). +""" + +from __future__ import annotations + +import json +import logging +import time +from pathlib import Path +from typing import Optional + +try: + import sqlalchemy as sa + from sqlalchemy import orm +except ImportError: + raise ImportError( + "sqlalchemy is required for the SFM cache.\n" + "Install it with: pip install sqlalchemy" + ) + +log = logging.getLogger("sfm.cache") + +# ── Schema ──────────────────────────────────────────────────────────────────── + +Base = orm.declarative_base() + +_MONITOR_STATUS_TTL = 30 # seconds + + +class CachedDevice(Base): + """ + Device identity + compliance config, keyed by connection address. + + Stores the full serialised JSON blob returned by /device/info so the + endpoint can return it verbatim on a cache hit without re-connecting. + """ + __tablename__ = "cached_devices" + + # Connection key — either TCP (host+port) or serial (port+baud) + conn_key = sa.Column(sa.String, primary_key=True) # e.g. "tcp:1.2.3.4:12345" + serial = sa.Column(sa.String, nullable=True) # e.g. "BE11529" + info_json = sa.Column(sa.Text, nullable=False) # full /device/info response JSON + updated_at = sa.Column(sa.Float, nullable=False) # Unix timestamp of last write + + # When a config write happens we set this flag so the next /device/info call + # fetches fresh data instead of serving stale compliance config. + config_dirty = sa.Column(sa.Boolean, default=False, nullable=False) + + +class CachedEvent(Base): + """ + Per-event header + peak values + project info, keyed by (conn_key, index). + + Events are immutable once recorded on the device; once we have an event in + the cache it never needs to be re-downloaded unless explicitly requested. + """ + __tablename__ = "cached_events" + + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict + cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp + + +class CachedWaveform(Base): + """ + Full raw ADC waveform for a single event (SUB 5A full download). + + These are large (up to several MB) and expensive to fetch over cellular. + Once downloaded they are immutable and cached permanently. + """ + __tablename__ = "cached_waveforms" + + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON + cached_at = sa.Column(sa.Float, nullable=False) + + +class CachedMonitorStatus(Base): + """ + Monitor status (battery, memory, is_monitoring) with a short TTL. + + These change frequently during field operations so we keep them only for + MONITOR_STATUS_TTL seconds before re-fetching from the device. + """ + __tablename__ = "cached_monitor_status" + + conn_key = sa.Column(sa.String, primary_key=True) + status_json = sa.Column(sa.Text, nullable=False) + cached_at = sa.Column(sa.Float, nullable=False) + + +# ── Cache store ─────────────────────────────────────────────────────────────── + +class SFMCache: + """ + SQLite-backed cache for SFM device data. + + Usage + ----- + cache = SFMCache() # stores in sfm/data/sfm_cache.db by default + cache = SFMCache(":memory:") # in-memory (tests / ephemeral mode) + + All public methods accept a *conn_key* string — use make_conn_key() to + build a consistent key from the transport parameters. + """ + + def __init__(self, db_path: str | Path | None = None) -> None: + in_memory = (db_path == ":memory:") + if db_path is None: + # Default: alongside this file in sfm/data/ + db_path = Path(__file__).parent / "data" / "sfm_cache.db" + if not in_memory: + db_path = Path(db_path) + db_path.parent.mkdir(parents=True, exist_ok=True) + + url = "sqlite:///:memory:" if in_memory else f"sqlite:///{db_path}" + engine = sa.create_engine(url, connect_args={"check_same_thread": False}) + Base.metadata.create_all(engine) + self._Session = orm.sessionmaker(bind=engine) + log.info("SFM cache opened: %s", db_path) + + # ── Connection key ──────────────────────────────────────────────────────── + + @staticmethod + def make_conn_key( + host: Optional[str], + tcp_port: int, + port: Optional[str], + baud: int, + ) -> str: + """Return a stable string key for this transport configuration.""" + if host: + return f"tcp:{host}:{tcp_port}" + return f"serial:{port}:{baud}" + + # ── Device info ─────────────────────────────────────────────────────────── + + def get_device_info(self, conn_key: str) -> Optional[dict]: + """ + Return cached device info dict, or None if not cached / config_dirty. + """ + with self._Session() as s: + row = s.get(CachedDevice, conn_key) + if row is None or row.config_dirty: + return None + return json.loads(row.info_json) + + def set_device_info(self, conn_key: str, info: dict) -> None: + """Store device info and clear any dirty flag.""" + with self._Session() as s: + row = s.get(CachedDevice, conn_key) + serial = info.get("serial") + if row is None: + row = CachedDevice( + conn_key=conn_key, + serial=serial, + info_json=json.dumps(info), + updated_at=time.time(), + config_dirty=False, + ) + s.add(row) + else: + row.serial = serial + row.info_json = json.dumps(info) + row.updated_at = time.time() + row.config_dirty = False + s.commit() + log.debug("cached device info for %s (serial=%s)", conn_key, serial) + + def mark_config_dirty(self, conn_key: str) -> None: + """ + Called after a successful POST /device/config write. + + Forces the next /device/info call to re-read compliance config from the + device instead of serving the now-stale cached version. + """ + with self._Session() as s: + row = s.get(CachedDevice, conn_key) + if row: + row.config_dirty = True + s.commit() + log.debug("marked config dirty for %s", conn_key) + + # ── Events ──────────────────────────────────────────────────────────────── + + def get_cached_event_count(self, conn_key: str) -> int: + """Return the number of events we have cached for this device.""" + with self._Session() as s: + return s.query(CachedEvent).filter_by(conn_key=conn_key).count() + + def get_all_events(self, conn_key: str) -> Optional[list[dict]]: + """ + Return all cached events as a list of dicts, sorted by index. + Returns None if nothing is cached yet. + """ + with self._Session() as s: + rows = ( + s.query(CachedEvent) + .filter_by(conn_key=conn_key) + .order_by(CachedEvent.index) + .all() + ) + if not rows: + return None + return [json.loads(r.event_json) for r in rows] + + def get_event(self, conn_key: str, index: int) -> Optional[dict]: + """Return a single cached event by index, or None if not cached.""" + with self._Session() as s: + row = s.get(CachedEvent, (conn_key, index)) + return json.loads(row.event_json) if row else None + + def set_events(self, conn_key: str, events: list[dict]) -> None: + """ + Upsert a list of event dicts. Existing rows are updated; new rows are + inserted. This is used to add newly-discovered events to the cache. + """ + now = time.time() + with self._Session() as s: + for ev in events: + idx = ev["index"] + row = s.get(CachedEvent, (conn_key, idx)) + if row is None: + row = CachedEvent( + conn_key=conn_key, + index=idx, + event_json=json.dumps(ev), + cached_at=now, + ) + s.add(row) + log.debug("cached new event %d for %s", idx, conn_key) + else: + # Refresh in case project_info was backfilled after initial store + row.event_json = json.dumps(ev) + s.commit() + + # ── Waveforms ───────────────────────────────────────────────────────────── + + def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: + """Return a cached full waveform response dict, or None if not cached.""" + with self._Session() as s: + row = s.get(CachedWaveform, (conn_key, index)) + if row is None: + return None + log.debug("waveform cache hit: %s event %d", conn_key, index) + return json.loads(row.waveform_json) + + def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: + """Store a full waveform response dict permanently.""" + with self._Session() as s: + row = s.get(CachedWaveform, (conn_key, index)) + if row is None: + row = CachedWaveform( + conn_key=conn_key, + index=index, + waveform_json=json.dumps(waveform), + cached_at=time.time(), + ) + s.add(row) + else: + row.waveform_json = json.dumps(waveform) + row.cached_at = time.time() + s.commit() + log.debug("cached waveform for %s event %d", conn_key, index) + + # ── Monitor status ──────────────────────────────────────────────────────── + + def get_monitor_status(self, conn_key: str) -> Optional[dict]: + """Return cached monitor status if it's within TTL, else None.""" + with self._Session() as s: + row = s.get(CachedMonitorStatus, conn_key) + if row is None: + return None + age = time.time() - row.cached_at + if age > _MONITOR_STATUS_TTL: + log.debug("monitor status expired (age=%.1fs) for %s", age, conn_key) + return None + return json.loads(row.status_json) + + def set_monitor_status(self, conn_key: str, status: dict) -> None: + """Store monitor status.""" + with self._Session() as s: + row = s.get(CachedMonitorStatus, conn_key) + if row is None: + row = CachedMonitorStatus( + conn_key=conn_key, + status_json=json.dumps(status), + cached_at=time.time(), + ) + s.add(row) + else: + row.status_json = json.dumps(status) + row.cached_at = time.time() + s.commit() + + def invalidate_monitor_status(self, conn_key: str) -> None: + """ + Called after start/stop monitoring so the next status poll re-reads from device. + """ + with self._Session() as s: + row = s.get(CachedMonitorStatus, conn_key) + if row: + s.delete(row) + s.commit() + + # ── Cache management ────────────────────────────────────────────────────── + + def clear_device(self, conn_key: str) -> dict: + """ + Remove all cached data for a device. Returns counts of deleted rows. + """ + counts = {} + with self._Session() as s: + counts["device_info"] = s.query(CachedDevice).filter_by(conn_key=conn_key).delete() + counts["events"] = s.query(CachedEvent).filter_by(conn_key=conn_key).delete() + counts["waveforms"] = s.query(CachedWaveform).filter_by(conn_key=conn_key).delete() + counts["monitor_status"] = s.query(CachedMonitorStatus).filter_by(conn_key=conn_key).delete() + s.commit() + log.info("cleared cache for %s: %s", conn_key, counts) + return counts + + def stats(self) -> dict: + """Return row counts for all cache tables (for /cache/stats endpoint).""" + with self._Session() as s: + return { + "devices": s.query(CachedDevice).count(), + "events": s.query(CachedEvent).count(), + "waveforms": s.query(CachedWaveform).count(), + "monitor_status": s.query(CachedMonitorStatus).count(), + } + + +# ── Module-level singleton ──────────────────────────────────────────────────── +# Instantiated once when the module is imported; shared across all requests. + +_cache: Optional[SFMCache] = None + + +def get_cache() -> SFMCache: + """Return the module-level cache singleton, initialising it on first call.""" + global _cache + if _cache is None: + _cache = SFMCache() + return _cache diff --git a/sfm/server.py b/sfm/server.py index 0ae36ba..8c5f66f 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -58,6 +58,7 @@ from minimateplus import MiniMateClient from minimateplus.protocol import ProtocolError from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT +from sfm.cache import SFMCache, get_cache logging.basicConfig( level=logging.INFO, @@ -239,6 +240,33 @@ def _run_with_retry(fn, *, is_tcp: bool): return fn() # let any second failure propagate normally +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _backfill_events(events: list, info: "DeviceInfo") -> None: + """ + Fill in sample_rate and project_info fields that the per-event waveform + record doesn't carry — sourced from the device's compliance config. + + Extracted from device_events() so it can be called from both the full + download path and the partial (new-events-only) path. + """ + if info.compliance_config and info.compliance_config.sample_rate: + for ev in events: + if ev.sample_rate is None: + ev.sample_rate = info.compliance_config.sample_rate + + if info.compliance_config: + cc = info.compliance_config + for ev in events: + if ev.project_info is None: + ev.project_info = ProjectInfo() + pi = ev.project_info + if pi.client is None: pi.client = cc.client + if pi.operator is None: pi.operator = cc.operator + if pi.sensor_location is None: pi.sensor_location = cc.sensor_location + if pi.notes is None: pi.notes = cc.notes + + # ── Endpoints ────────────────────────────────────────────────────────────────── @app.get("/health") @@ -259,6 +287,7 @@ def device_info( baud: int = Query(38400, description="Serial baud rate (default 38400)"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Connect to the device, perform the POLL startup handshake, and return @@ -266,8 +295,23 @@ def device_info( Supply either *port* (serial) or *host* (TCP/modem). Equivalent to POST /device/connect — provided as GET for convenience. + + **Caching**: device identity and compliance config are cached after the first + successful read (they rarely change). Pass *force=true* to bypass the cache + and re-read directly from the device (e.g. after a config push). + The cache is also automatically invalidated after POST /device/config. """ - log.info("GET /device/info port=%s host=%s tcp_port=%d", port, host, tcp_port) + log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force) + + cache = get_cache() + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + + if not force: + cached = cache.get_device_info(conn_key) + if cached is not None: + log.info("device info cache hit for %s", conn_key) + cached["_cached"] = True + return cached try: def _do(): @@ -287,7 +331,9 @@ def device_info( except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc - return _serialise_device_info(info) + result = _serialise_device_info(info) + cache.set_device_info(conn_key, result) + return result @app.post("/device/connect") @@ -311,6 +357,7 @@ def device_events( host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), debug: bool = Query(False, description="Include raw record hex for field-layout inspection"), + force: bool = Query(False, description="Bypass cache and re-download all events from device"), ) -> dict: """ Connect to the device, read the event index, and download all stored @@ -322,10 +369,108 @@ def device_events( verifying field offsets against the protocol reference. This does NOT download raw ADC waveform samples — those are large and - fetched separately via GET /device/event/{idx}/waveform (future endpoint). - """ - log.info("GET /device/events port=%s host=%s debug=%s", port, host, debug) + fetched separately via GET /device/event/{idx}/waveform. + **Caching**: event headers are cached after the first download. On subsequent + calls, the device is contacted only to check the event count (fast: ~2s). + If the count matches the cache, all events are returned from cache instantly. + If new events exist on the device, only the new ones are downloaded and merged. + Pass *force=true* to bypass the cache entirely and re-download everything. + """ + log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force) + + cache = get_cache() + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + + # ── Smart cache path (skip when debug=True or force=True) ──────────────── + # debug mode uses raw_record_hex which isn't stored in the cache, so we + # must always go to the device when debug is requested. + if not force and not debug: + cached_events = cache.get_all_events(conn_key) + cached_count = len(cached_events) if cached_events else 0 + + if cached_count > 0: + # Quick device contact: just count events via the fast 1E/1F chain. + # This takes ~2s instead of the full event download (~10-30s). + try: + def _count(): + with _build_client(port, baud, host, tcp_port) as client: + client.connect() + return client.count_events() + device_count = _run_with_retry(_count, is_tcp=_is_tcp(host)) + except HTTPException: + raise + except (ProtocolError, OSError, Exception) as exc: + # If we can't reach the device at all, serve stale cache rather + # than returning an error — field units go offline regularly. + log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key) + cached_info = cache.get_device_info(conn_key) or {} + return { + "device": cached_info, + "event_count": cached_count, + "events": cached_events, + "_cached": True, + "_stale": True, + } + + if device_count == cached_count: + # Nothing new — return cache immediately, no event download needed. + log.info( + "event cache hit for %s: %d events, device count matches", + conn_key, cached_count, + ) + cached_info = cache.get_device_info(conn_key) or {} + return { + "device": cached_info, + "event_count": cached_count, + "events": cached_events, + "_cached": True, + } + + if device_count > cached_count: + # New events on the device — download all events but only store/return + # the new ones. Events are append-only; indices 0..(cached_count-1) + # are already in the cache and don't need to be re-downloaded logically, + # but the protocol requires iterating from event 0 to reach later ones. + # The device download time is dominated by the number of events requested, + # so we stop at the last known event index to avoid re-downloading everything. + log.info( + "new events on device %s: have %d, device has %d — fetching all up to %d", + conn_key, cached_count, device_count, device_count - 1, + ) + try: + def _fetch_new(): + with _build_client(port, baud, host, tcp_port) as client: + info = client.connect() + all_evs = client.get_events(stop_after_index=device_count - 1) + return info, all_evs + info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host)) + except HTTPException: + raise + except ProtocolError as exc: + raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc + except OSError as exc: + raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc + + _backfill_events(all_events, info) + # Only the new events (indices >= cached_count) are truly new. + new_events = [ev for ev in all_events if ev.index >= cached_count] + new_serialised = [_serialise_event(ev) for ev in new_events] + cache.set_events(conn_key, new_serialised) + cache.set_device_info(conn_key, _serialise_device_info(info)) + + merged_events = cache.get_all_events(conn_key) + return { + "device": _serialise_device_info(info), + "event_count": len(merged_events), + "events": merged_events, + "_cached": True, + "_new_events": len(new_events), + } + + # ── Full download path (first call, force=True, or debug=True) ─────────── try: def _do(): with _build_client(port, baud, host, tcp_port) as client: @@ -340,31 +485,19 @@ def device_events( except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc - # Fill sample_rate from compliance config where the event record doesn't supply it. - # sample_rate is a device-level setting, not stored per-event in the waveform record. - if info.compliance_config and info.compliance_config.sample_rate: - for ev in events: - if ev.sample_rate is None: - ev.sample_rate = info.compliance_config.sample_rate + _backfill_events(events, info) + serialised = [_serialise_event(ev, debug=debug) for ev in events] - # Backfill event.project_info fields that the 210-byte waveform record doesn't carry. - # The waveform record only stores "Project:" — client/operator/sensor_location/notes - # live in the SUB 1A compliance config, not in the per-event record. - if info.compliance_config: - cc = info.compliance_config - for ev in events: - if ev.project_info is None: - ev.project_info = ProjectInfo() - pi = ev.project_info - if pi.client is None: pi.client = cc.client - if pi.operator is None: pi.operator = cc.operator - if pi.sensor_location is None: pi.sensor_location = cc.sensor_location - if pi.notes is None: pi.notes = cc.notes + if not debug: + # Only cache when not in debug mode (debug adds raw_record_hex which + # we don't want polluting the normal cache entries). + cache.set_events(conn_key, serialised) + cache.set_device_info(conn_key, _serialise_device_info(info)) return { "device": _serialise_device_info(info), "event_count": len(events), - "events": [_serialise_event(ev, debug=debug) for ev in events], + "events": serialised, } @@ -375,21 +508,36 @@ def device_event( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-download from device"), ) -> dict: """ Download a single event by index (0-based). Supply either *port* (serial) or *host* (TCP/modem). Performs: POLL startup → event index → event header → waveform record. + + **Caching**: if this event was already downloaded (e.g. via GET /device/events), + it is returned instantly from cache with no device contact. """ - log.info("GET /device/event/%d port=%s host=%s", index, port, host) + log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force) + + cache = get_cache() + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + + if not force: + cached = cache.get_event(conn_key, index) + if cached is not None: + log.info("event cache hit for %s index %d", conn_key, index) + cached["_cached"] = True + return cached try: def _do(): with _build_client(port, baud, host, tcp_port) as client: - client.connect() - return client.get_events(stop_after_index=index) - events = _run_with_retry(_do, is_tcp=_is_tcp(host)) + info = client.connect() + events = client.get_events(stop_after_index=index) + return info, events + info, events = _run_with_retry(_do, is_tcp=_is_tcp(host)) except HTTPException: raise except ProtocolError as exc: @@ -406,7 +554,14 @@ def device_event( detail=f"Event index {index} not found on device", ) - return _serialise_event(matching[0]) + _backfill_events(matching, info) + result = _serialise_event(matching[0]) + + # Store all downloaded events (we paid for them anyway — indices 0..index) + all_serialised = [_serialise_event(ev) for ev in events] + cache.set_events(conn_key, all_serialised) + + return result @app.get("/device/event/{index}/waveform") @@ -416,6 +571,7 @@ def device_event_waveform( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-download from device"), ) -> dict: """ Download the full raw ADC waveform for a single event (0-based index). @@ -434,8 +590,23 @@ def device_event_waveform( - **sample_rate**: samples per second (from compliance config) - **channels**: dict of channel name → list of signed int16 ADC counts (keys: "Tran", "Vert", "Long", "Mic") + + **Caching**: full waveforms are cached permanently after the first download — + they are immutable once recorded on the device. Subsequent requests for the + same event return instantly from cache without any device contact. + Pass *force=true* to force a fresh download (rarely needed). """ - log.info("GET /device/event/%d/waveform port=%s host=%s", index, port, host) + log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force) + + cache = get_cache() + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + + if not force: + cached = cache.get_waveform(conn_key, index) + if cached is not None: + log.info("waveform cache hit for %s event %d", conn_key, index) + cached["_cached"] = True + return cached try: def _do(): @@ -469,7 +640,7 @@ def device_event_waveform( if sample_rate is None and info.compliance_config: sample_rate = info.compliance_config.sample_rate - return { + result = { "index": ev.index, "record_type": ev.record_type, "timestamp": _serialise_timestamp(ev.timestamp), @@ -481,6 +652,8 @@ def device_event_waveform( "peak_values": _serialise_peak_values(ev.peak_values), "channels": raw, } + cache.set_waveform(conn_key, index, result) + return result # ── Write endpoints ─────────────────────────────────────────────────────────── @@ -595,6 +768,10 @@ def device_config( except Exception as exc: raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc + # Config was written to the device — the cached compliance config is now stale. + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + get_cache().mark_config_dirty(conn_key) + return { "status": "ok", "updated_fields": changed, @@ -622,6 +799,7 @@ def device_monitor_status( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass cache and re-read from device"), ) -> dict: """ Read monitoring status from the device. @@ -633,7 +811,20 @@ def device_monitor_status( Returns is_monitoring bool, battery voltage, and memory usage (total + free bytes). Battery and memory are only present when the unit is idle. + + **Caching**: status is cached for 30 seconds to reduce cellular polling overhead. + Pass *force=true* to bypass the cache for an immediate fresh read. """ + cache = get_cache() + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + + if not force: + cached = cache.get_monitor_status(conn_key) + if cached is not None: + log.debug("monitor status cache hit for %s", conn_key) + cached["_cached"] = True + return cached + with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client: try: client.poll() @@ -651,6 +842,8 @@ def device_monitor_status( if status.memory_free is not None: result["memory_free_bytes"] = status.memory_free result["memory_free_kb"] = round(status.memory_free / 1024, 1) + + cache.set_monitor_status(conn_key, result) return result @@ -673,6 +866,9 @@ def device_monitor_start( log.warning("start monitoring poll retry: %s", exc) client.poll() client.start_monitoring() + + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + get_cache().invalidate_monitor_status(conn_key) return {"status": "started"} @@ -695,9 +891,44 @@ def device_monitor_stop( log.warning("stop monitoring poll retry: %s", exc) client.poll() client.stop_monitoring() + + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + get_cache().invalidate_monitor_status(conn_key) return {"status": "stopped"} +# ── Cache management endpoints ──────────────────────────────────────────────── + +@app.get("/cache/stats") +def cache_stats() -> dict: + """ + Return row counts for all cache tables. + + Useful for debugging and verifying that caching is working as expected. + """ + return get_cache().stats() + + +@app.delete("/cache/device") +def cache_clear_device( + port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"), + baud: int = Query(38400, description="Serial baud rate"), + host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), + tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), +) -> dict: + """ + Clear all cached data for a specific device (identified by its connection address). + + Clears: device info, all event headers, all waveforms, monitor status. + The next request to any endpoint for this device will re-fetch from the device. + + Supply either *port* (serial) or *host* (TCP/modem) to identify the device. + """ + conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud) + counts = get_cache().clear_device(conn_key) + return {"status": "cleared", "conn_key": conn_key, "deleted": counts} + + # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__":