""" sfm/live_cache.py — Thread-safe in-memory cache for live SFM device data. Extracted from sfm/server.py so the cache logic is importable and testable without pulling in fastapi/uvicorn. Caching strategy ---------------- Keyed by `conn_key` ("tcp:host:port" or "serial:port:baud"). Does NOT persist across server restarts. device_info cached until POST /device/config marks it dirty events cached by (conn_key, device_event_count); re-fetched when a quick count_events() probe shows new events on the device monitor_status 30-second TTL (changes frequently during monitoring) waveforms permanent within a process — but auto-evicted at the device level when a (waveform_key, timestamp) mismatch is detected at the same index (post-erase key reuse — the device's event-key counter resets to 0x01110000 after every erase, so the same `(conn_key, index)` slot can refer to a brand-new physical event). All endpoints accept ?force=true to bypass the cache and re-read. """ from __future__ import annotations import threading import time from typing import Optional _MONITOR_STATUS_TTL = 30.0 # seconds class LiveCache: """ Thread-safe in-memory cache for live SFM device data. One singleton per server process. """ def __init__(self) -> None: self._lock = threading.Lock() self._device_info: dict[str, dict] = {} self._events: dict[str, tuple[int, list]] = {} self._monitor_status: dict[str, tuple[float, dict]] = {} self._config_dirty: dict[str, bool] = {} self._waveforms: dict[tuple, dict] = {} # ── Connection key ──────────────────────────────────────────────────────── @staticmethod def make_conn_key( host: Optional[str], tcp_port: int, port: Optional[str], baud: int, ) -> str: if host: return f"tcp:{host}:{tcp_port}" return f"serial:{port}:{baud}" # ── Eviction signature ──────────────────────────────────────────────────── @staticmethod def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]: """Return (waveform_key_hex, timestamp_iso) from a serialised event.""" key = ev.get("waveform_key") or ev.get("_waveform_key") if isinstance(key, (bytes, bytearray)): key = bytes(key).hex() ts = ev.get("timestamp") if isinstance(ts, dict): ts = ts.get("iso") or ts.get("string") or None return (key if isinstance(key, str) else None, ts if isinstance(ts, str) else None) def _flush_device(self, conn_key: str) -> None: """Drop all cached events + waveforms for one device. Caller holds lock.""" self._events.pop(conn_key, None) stale_wf_keys = [k for k in self._waveforms if k[0] == conn_key] for k in stale_wf_keys: self._waveforms.pop(k, None) # ── Device info ─────────────────────────────────────────────────────────── def get_device_info(self, conn_key: str) -> Optional[dict]: with self._lock: if self._config_dirty.get(conn_key): return None return self._device_info.get(conn_key) def set_device_info(self, conn_key: str, info: dict) -> None: with self._lock: self._device_info[conn_key] = info self._config_dirty[conn_key] = False # ── Events ──────────────────────────────────────────────────────────────── def get_events(self, conn_key: str, device_count: int) -> Optional[list]: with self._lock: if self._config_dirty.get(conn_key): return None entry = self._events.get(conn_key) if entry is None: return None cached_count, events = entry return events if cached_count == device_count else None def set_events(self, conn_key: str, device_count: int, events: list) -> None: """ Replace the cached events list for `conn_key`. If any incoming event has a different (waveform_key, timestamp) than the cached entry at the same index, flush the entire conn_key's event + waveform cache first. Catches post-erase key reuse. """ with self._lock: cached_entry = self._events.get(conn_key) cached_events = cached_entry[1] if cached_entry else [] cached_by_index = {e.get("index"): e for e in cached_events} evict = False for ev in events: idx = ev.get("index") if idx is None: continue cached = cached_by_index.get(idx) if cached is None: continue new_key, new_ts = self._event_signature(ev) old_key, old_ts = self._event_signature(cached) if (new_key and old_key and new_key != old_key) or \ (new_ts and old_ts and new_ts != old_ts): evict = True break if evict: self._flush_device(conn_key) self._events[conn_key] = (device_count, events) # ── Monitor status ──────────────────────────────────────────────────────── def get_monitor_status(self, conn_key: str) -> Optional[dict]: with self._lock: entry = self._monitor_status.get(conn_key) if entry is None: return None fetched_at, status = entry if time.time() - fetched_at > _MONITOR_STATUS_TTL: return None return status def set_monitor_status(self, conn_key: str, status: dict) -> None: with self._lock: self._monitor_status[conn_key] = (time.time(), status) def invalidate_monitor_status(self, conn_key: str) -> None: with self._lock: self._monitor_status.pop(conn_key, None) # ── Config dirty flag ───────────────────────────────────────────────────── def mark_config_dirty(self, conn_key: str) -> None: with self._lock: self._config_dirty[conn_key] = True self._events.pop(conn_key, None) # ── Waveforms (permanent cache, evicted on (key,ts) mismatch) ───────────── def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: with self._lock: return self._waveforms.get((conn_key, index)) def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: """ Cache a waveform. Evicts the device's whole cache when the existing entry at the same index has a different (waveform_key, timestamp). """ with self._lock: existing = self._waveforms.get((conn_key, index)) if existing is not None: new_key, new_ts = self._event_signature(waveform) old_key, old_ts = self._event_signature(existing) differs = ( (new_key and old_key and new_key != old_key) or (new_ts and old_ts and new_ts != old_ts) ) if differs: self._flush_device(conn_key) self._waveforms[(conn_key, index)] = waveform