Files
seismo-relay/sfm/live_cache.py
T
serversdown 9afa3484f4 feat(cache): implement integrity checks for cached events and waveforms
- Added `waveform_key` and `event_timestamp` columns to `CachedEvent` and `CachedWaveform` for integrity verification.
- Implemented logic to flush the cache when a mismatch in (waveform_key, event_timestamp) is detected during event and waveform updates.
- Enhanced `set_events` and `set_waveform` methods to check for mismatches and trigger cache eviction as necessary.
- Introduced a new `LiveCache` class to manage in-memory caching of live device data, separating it from the server logic for better testability.
- Added tests to verify the correctness of cache invalidation logic, particularly for post-erase key reuse scenarios.
- Updated web application to include a "Force refresh" toggle, allowing users to bypass the cache and re-fetch data from the device.
2026-05-07 04:42:00 +00:00

190 lines
8.0 KiB
Python

"""
sfm/live_cache.py — Thread-safe in-memory cache for live SFM device data.
Extracted from sfm/server.py so the cache logic is importable and testable
without pulling in fastapi/uvicorn.
Caching strategy
----------------
Keyed by `conn_key` ("tcp:host:port" or "serial:port:baud"). Does NOT
persist across server restarts.
device_info cached until POST /device/config marks it dirty
events cached by (conn_key, device_event_count); re-fetched when
a quick count_events() probe shows new events on the device
monitor_status 30-second TTL (changes frequently during monitoring)
waveforms permanent within a process — but auto-evicted at the device
level when a (waveform_key, timestamp) mismatch is detected
at the same index (post-erase key reuse — the device's
event-key counter resets to 0x01110000 after every erase,
so the same `(conn_key, index)` slot can refer to a
brand-new physical event).
All endpoints accept ?force=true to bypass the cache and re-read.
"""
from __future__ import annotations
import threading
import time
from typing import Optional
_MONITOR_STATUS_TTL = 30.0 # seconds
class LiveCache:
"""
Thread-safe in-memory cache for live SFM device data.
One singleton per server process.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._device_info: dict[str, dict] = {}
self._events: dict[str, tuple[int, list]] = {}
self._monitor_status: dict[str, tuple[float, dict]] = {}
self._config_dirty: dict[str, bool] = {}
self._waveforms: dict[tuple, dict] = {}
# ── Connection key ────────────────────────────────────────────────────────
@staticmethod
def make_conn_key(
host: Optional[str],
tcp_port: int,
port: Optional[str],
baud: int,
) -> str:
if host:
return f"tcp:{host}:{tcp_port}"
return f"serial:{port}:{baud}"
# ── Eviction signature ────────────────────────────────────────────────────
@staticmethod
def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]:
"""Return (waveform_key_hex, timestamp_iso) from a serialised event."""
key = ev.get("waveform_key") or ev.get("_waveform_key")
if isinstance(key, (bytes, bytearray)):
key = bytes(key).hex()
ts = ev.get("timestamp")
if isinstance(ts, dict):
ts = ts.get("iso") or ts.get("string") or None
return (key if isinstance(key, str) else None,
ts if isinstance(ts, str) else None)
def _flush_device(self, conn_key: str) -> None:
"""Drop all cached events + waveforms for one device. Caller holds lock."""
self._events.pop(conn_key, None)
stale_wf_keys = [k for k in self._waveforms if k[0] == conn_key]
for k in stale_wf_keys:
self._waveforms.pop(k, None)
# ── Device info ───────────────────────────────────────────────────────────
def get_device_info(self, conn_key: str) -> Optional[dict]:
with self._lock:
if self._config_dirty.get(conn_key):
return None
return self._device_info.get(conn_key)
def set_device_info(self, conn_key: str, info: dict) -> None:
with self._lock:
self._device_info[conn_key] = info
self._config_dirty[conn_key] = False
# ── Events ────────────────────────────────────────────────────────────────
def get_events(self, conn_key: str, device_count: int) -> Optional[list]:
with self._lock:
if self._config_dirty.get(conn_key):
return None
entry = self._events.get(conn_key)
if entry is None:
return None
cached_count, events = entry
return events if cached_count == device_count else None
def set_events(self, conn_key: str, device_count: int, events: list) -> None:
"""
Replace the cached events list for `conn_key`. If any incoming event
has a different (waveform_key, timestamp) than the cached entry at
the same index, flush the entire conn_key's event + waveform cache
first. Catches post-erase key reuse.
"""
with self._lock:
cached_entry = self._events.get(conn_key)
cached_events = cached_entry[1] if cached_entry else []
cached_by_index = {e.get("index"): e for e in cached_events}
evict = False
for ev in events:
idx = ev.get("index")
if idx is None:
continue
cached = cached_by_index.get(idx)
if cached is None:
continue
new_key, new_ts = self._event_signature(ev)
old_key, old_ts = self._event_signature(cached)
if (new_key and old_key and new_key != old_key) or \
(new_ts and old_ts and new_ts != old_ts):
evict = True
break
if evict:
self._flush_device(conn_key)
self._events[conn_key] = (device_count, events)
# ── Monitor status ────────────────────────────────────────────────────────
def get_monitor_status(self, conn_key: str) -> Optional[dict]:
with self._lock:
entry = self._monitor_status.get(conn_key)
if entry is None:
return None
fetched_at, status = entry
if time.time() - fetched_at > _MONITOR_STATUS_TTL:
return None
return status
def set_monitor_status(self, conn_key: str, status: dict) -> None:
with self._lock:
self._monitor_status[conn_key] = (time.time(), status)
def invalidate_monitor_status(self, conn_key: str) -> None:
with self._lock:
self._monitor_status.pop(conn_key, None)
# ── Config dirty flag ─────────────────────────────────────────────────────
def mark_config_dirty(self, conn_key: str) -> None:
with self._lock:
self._config_dirty[conn_key] = True
self._events.pop(conn_key, None)
# ── Waveforms (permanent cache, evicted on (key,ts) mismatch) ─────────────
def get_waveform(self, conn_key: str, index: int) -> Optional[dict]:
with self._lock:
return self._waveforms.get((conn_key, index))
def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None:
"""
Cache a waveform. Evicts the device's whole cache when the existing
entry at the same index has a different (waveform_key, timestamp).
"""
with self._lock:
existing = self._waveforms.get((conn_key, index))
if existing is not None:
new_key, new_ts = self._event_signature(waveform)
old_key, old_ts = self._event_signature(existing)
differs = (
(new_key and old_key and new_key != old_key)
or (new_ts and old_ts and new_ts != old_ts)
)
if differs:
self._flush_device(conn_key)
self._waveforms[(conn_key, index)] = waveform