Merge branch 'feature/intelligent-caching'

This commit is contained in:
2026-04-17 03:46:22 +00:00
7 changed files with 658 additions and 92 deletions
+26
View File
@@ -4,6 +4,32 @@ All notable changes to seismo-relay are documented here.
---
## v0.12.1 — 2026-04-16
### Added
- **`sfm/server.py``_LiveCache`** — in-memory live device cache that eliminates
redundant TCP round-trips between web requests. Plain Python dict +
`threading.Lock`, no extra dependencies.
Cache strategy per endpoint:
| Endpoint | Strategy |
|---|---|
| `GET /device/info` | Indefinite; invalidated by `POST /device/config` |
| `GET /device/events` | Count-probe fast path — `poll()+count_events()` (~2 s); returns cached data if event count is unchanged; full download only when new events are detected |
| `GET /device/monitor/status` | 30-second TTL; invalidated immediately on monitor start/stop |
| `GET /device/event/{idx}/waveform` | Permanent per-index (waveforms are immutable once recorded) |
- **`?force=true`** query param on all cached endpoints — bypasses cache and forces
a fresh read from the device.
- **Cache invalidation hooks** — `POST /device/config` marks device info and events
stale; `POST /device/monitor/start` and `/stop` evict the monitor status entry
immediately so the next status poll reflects the actual device state.
---
## v0.12.0 — 2026-04-13
### Added
+4 -3
View File
File diff suppressed because one or more lines are too long
+10 -10
View File
@@ -36,7 +36,7 @@
| 2026-03-02 | §7.4 Event Index Block | **NEW:** `Monitoring LCD Cycle` identified at offsets +84/+85 as uint16 BE. Default value = 65500 (0xFFDC) = effectively disabled / maximum. Confirmed from operator manual §3.13.1g. |
| 2026-03-02 | §7.4 Event Index Block | **UPDATED:** Backlight confirmed as uint8 range 0255 seconds per operator manual §3.13.1e ("adjustable timer, 0 to 255 seconds"). Power save unit confirmed as minutes per operator manual §3.13.1f. |
| 2026-03-02 | Global | **NEW SOURCE:** Operator manual (716U0101 Rev 15) added as reference. Cross-referencing settings definitions, ranges, and units. Header updated. |
| 2026-03-02 | §14 Open Questions | Float 6.2061 in/s mystery: manual confirms only two geo ranges (1.25 in/s and 10.0 in/s). 6.2061 is NOT a user-selectable range → likely internal ADC full-scale calibration constant or hardware range ceiling. Downgraded to LOW priority. |
| 2026-03-02 | §14 Open Questions | Float 6.2061 in/s mystery: manual confirms only two geo ranges (1.25 in/s and 10.0 in/s). 6.2061 is NOT a user-selectable range → originally speculated as internal ADC full-scale constant, but this is NOT confirmed. Using it as ADC full-scale produces ~9× PPV overread. Meaning unknown. Downgraded to LOW 2026-03-02, re-escalated to HIGH 2026-04-16. |
| 2026-03-02 | §14 Open Questions | `0x082A` hypothesis refined: 2090 decimal. At 1024 sps, 2 sec record = 2048 samples. Possible that 0x082A = total samples including 0.25s pre-trigger (256 samples) at some adjusted rate. Needs capture with different record time. |
| 2026-03-02 | §14 Open Questions | **NEW items added:** Trigger sample width (default=2), Auto Window (1-9 sec), Aux Trigger (enabled/disabled) — all confirmed settings from operator manual not yet mapped in protocol. |
| 2026-03-02 | §14 Open Questions | Monitoring LCD Cycle resolved — removed from open questions. |
@@ -92,7 +92,7 @@
| 2026-04-06 | §7.8.4 | **NEW — 5A end-of-stream signalling confirmed.** After streaming all waveform chunks, the device sends exactly **1 raw byte** in response to the next chunk request, then goes silent for the full recv timeout. This byte is NOT a complete DLE-framed A5 response — the frame parser accumulates it as `bytes_fed=1` and never assembles a frame. This is the device's natural end-of-stream signal. Handling: on TimeoutError, if `bytes_fed > 0` AND prior chunks were received, treat as graceful end and proceed to the termination frame. A `bytes_fed=0` timeout with no prior chunks is a genuine transport failure and must still raise. |
| 2026-04-06 | §7.8.4 | **NEW — 5A chunk timing and count (empirical, BE11529 at 1024 sps).** Each chunk response arrives within ~1 second over TCP/cellular. A 9,306-sample event (≈9.1 s at 1024 sps) produces **35 chunks** before end-of-stream. Chunks 116 have varying data lengths (10361123 bytes); chunks 1735 are uniformly 1036 bytes each (post-event silence, all-zero ADC samples). Safe recv timeout for chunk loop: **10 s** (10× typical response time). Default transport timeout (120 s) results in a ~2-minute stall per event at end-of-stream. |
| 2026-04-06 | §7.8.3 | **KNOWN ISSUE — `_decode_a5_waveform` hardcoded fi==9 skip.** The decoder contains `elif fi == 9: continue` which was written for the 9-frame original blast capture where frame 9 was a device terminator. For streams with >9 frames (current device produces 35+), frame index 9 is live waveform data — this skip discards ~1,070 bytes (~133 sample-sets) per event. The terminator is now detected via `page_key == 0x0000`, not by frame index. The fi==9 skip should be removed. |
| 2026-04-06 | §7.8 | **CONFIRMED — ADC count-to-physical-unit conversion.** Raw waveform samples are signed 16-bit integers (counts). Conversion: `value = counts × (range / 32767)`. For geo channels: range = 10.000 in/s (from the device's compliance config geo range field). For the mic channel: range is in psi (device-specific). Near-full-scale counts (≈32,700) on all four channels simultaneously indicate ADC saturation (clipping) from a high-amplitude event. |
| 2026-04-06 | §7.8 | **⚠ PARTIALLY INVALIDATED — ADC count-to-physical-unit conversion.** Raw waveform samples are signed 16-bit integers (counts). Conversion formula `value = counts × (range / 32767)` is believed correct, but the `range` value is UNKNOWN. The compliance config field labeled `max_range_geo` reads 6.206053 (bytes `40 C6 97 FD`), which does NOT match either user-selectable range shown in Blastware UI (1.25 or 10.000 in/s). The meaning and units of the 6.206053 value are unresolved — it may not be the ADC full-scale at all. See open question in §14. |
| 2026-04-08 | §5.1, §7.10, §12 | **NEW — Monitoring commands confirmed.** SUB 0x1C (monitor status), 0x96 (start monitoring), 0x97 (stop monitoring) all confirmed from 4-8-26/2ndtry capture. SESSION_RESET (`41 03`) required before POLL to wake a monitoring unit. |
| 2026-04-09 | §7.10 | **CORRECTED — monitoring flag and battery/memory offsets.** `section[1] == 0x10` is the monitoring flag (100% accurate across 144 data frames in 2ndtry capture). Previous note claiming `section[6]` was wrong — section[6] has device-specific non-binary values (0xea/0x07). Battery/memory offsets corrected: `section[-10:-8]` (battery×100), `section[-8:-4]` (memory_total), `section[-4:]` (memory_free). NOTE: `frame.data` has checksum stripped by parser — earlier offsets of `[-11:-9]`/`[-9:-5]`/`[-5:-1]` were wrong because they assumed a trailing checksum byte that isn't there. |
| 2026-04-08 | §7.10 | **NEW — SUBs 0x0E (channel sensor data) and 0x98 (trigger test) observed** in 4-8-26/sensor-check capture (Blastware "Unit Channel Test" comms check). SUB 0x0E: 2-step read with channel selector in `params[6:8]`, data length 0x0A per channel, RSP SUB = 0xF1. SUB 0x98: single probe frame with `params[0] = 0xFF`, RSP SUB = 0x67; sent twice per test cycle. Not yet implemented in SFM. |
@@ -528,7 +528,7 @@ The SUB `1A` read response (`E5`) and SUB `71` write block contain per-channel t
| Field | Example bytes | Decoded | Certainty |
|---|---|---|---|
| `[00 00]` | `00 00` | Separator / padding | 🔶 INFERRED |
| Max range float | `40 C6 97 FD` | 6.206 — full-scale range in in/s | 🔶 INFERRED |
| Max range float | `40 C6 97 FD` | 6.206 — **value confirmed, meaning and units UNKNOWN** (does NOT match UI range options 1.25/10.000 in/s; not confirmed as ADC full-scale) | ❓ UNKNOWN |
| `[00 00]` | `00 00` | Separator / padding | 🔶 INFERRED |
| **Trigger level** | `3F 19 99 9A` | **0.600 in/s** — IEEE 754 BE float | ✅ CONFIRMED |
| Unit string | `69 6E 2E 00` | `"in.\0"` | ✅ CONFIRMED |
@@ -655,7 +655,7 @@ offset size type value (Tran example) meaning
+10 2 uint16 0x0015 = 21 unknown
+12 4 bytes 03 02 04 01 flags (recording mode etc.)
+16 4 uint32 0x00000003 record time in seconds ✅ CONFIRMED
+1A 4 float32 6.2061 max range (in/s for geo, psi for mic)
+1A 4 float32 6.2061 ❓ UNKNOWN field — value 6.2061 confirmed; meaning/units unresolved (NOT confirmed as max range or ADC full-scale)
+1E 2 00 00 padding
+20 4 float32 0.6000 trigger level ✅ CONFIRMED
+24 4 char[4] "in.\0" / "psi\0" unit string (geo vs mic)
@@ -1235,13 +1235,13 @@ TimeoutError caught:
Chunks with uniform 1,036-byte payload (chunks 1735 in the observed event) contain all-zero ADC samples — the device continues recording silence until the configured record time expires before terminating the stream.
**ADC count-to-physical conversion:**
**ADC count-to-physical conversion — ⚠ SCALING UNKNOWN:**
Raw samples are signed 16-bit integers (32,768 to +32,767). To convert to physical units:
Raw samples are signed 16-bit integers (32,768 to +32,767). The conversion formula is believed to be:
```
value_in_s (in/s) = counts × (geo_range / 32767)
```
where `geo_range` is from the compliance config (typically 10.000 in/s). Mic channel uses psi units with its own range. Near-full-scale values on all channels simultaneously indicate ADC saturation (clipping).
However, the correct value of `geo_range` is **unknown**. The compliance config field `max_range_geo` reads 6.206053 (`40 C6 97 FD`) which does NOT match either user-selectable range (1.25 or 10.000 in/s) and produces ~9× too large PPV values compared to the on-device 0C record. Do not use 6.206053 or 10.000 as the scale factor until this is resolved. See §14 open question. Mic channel uses psi units with its own range (also unresolved).
**Known decoder issue — fi==9 hardcoded skip:**
@@ -1267,7 +1267,7 @@ Fields visible in the Blastware "Compliance Setup" dialog. ✅ = byte offset co
| Geophone — Enable all | bool | ❓ |
| Geophone — Trigger Source | bool | ❓ |
| Chan 1-3 Trigger Level | float, in/s | ✅ `trigger_level_geo` |
| Chan 1-3 Maximum Range | Normal 10.000 / 1.25 in/s | `max_range_geo` |
| Chan 1-3 Maximum Range | Normal 10.000 / 1.25 in/s | `max_range_geo` offset found, value=6.206053 — does NOT match UI values; meaning unknown |
| Microphone — Enable all | bool | ❓ |
| Microphone — Trigger Source | bool | ❓ |
| Chan 4 Trigger Level | float, dB or psi | ❓ |
@@ -1933,7 +1933,7 @@ The `.bin` files produced by `s3_bridge` are **not raw wire bytes**. The logger
| **Auxiliary Trigger read location****RESOLVED:** SUB `FE` offset `0x0109`, uint8, `0x00`=disabled, `0x01`=enabled. Confirmed 2026-03-11 via controlled toggle capture. | RESOLVED | 2026-03-02 | Resolved 2026-03-11 |
| **Auxiliary Trigger write path** — Write command not yet captured in a clean session. Inner frame handshake visible in A4 (multiple WRITE_CONFIRM_RESPONSE SUBs appear, TRIGGER_CONFIG_RESPONSE removed), but the BW→S3 write command itself was in a partial session. Likely SUB `15` or similar. Deferred for clean capture. | LOW | 2026-03-11 | NEW |
| ~~**SUB `6E` response to SUB `1C`**~~~~RESOLVED 2026-04-08: This was a misidentification.~~ The `1C → 6E` "exception" was misread — likely an inner A4 sub-frame. Confirmed from 4-8-26 capture (338 frames): SUB 0x1C always → 0xE3. No exceptions to the `0xFF SUB` rule are known. | RESOLVED | 2026-04-08 | CLOSED |
| **Max Geo Range float 6.2061 in/s**NOT a user-selectable range (manual only shows 1.25 and 10.0 in/s). Likely internal ADC full-scale constant or hardware range ceiling. Not worth capturing. | LOW | 2026-02-26 | Downgraded 2026-03-02 |
| **Max Geo Range float 6.2061**offset confirmed in channel block (`+1A`, `40 C6 97 FD`). Meaning and units are UNKNOWN. Value does NOT match either user-selectable range (1.25 / 10.0 in/s). Using it as ADC full-scale produces ~9× PPV overread vs on-device 0C values. Not simply metric vs imperial (25.4 factor doesn't reconcile). Needs investigation: examine surrounding channel block bytes, compare with a Blastware waveform CSV export to back-calculate the correct scale. Upgraded to HIGH priority. | HIGH | 2026-02-26 | Upgraded 2026-04-16 |
| MicL channel units — **RESOLVED: psi**, confirmed from `.set` file unit string `"psi\0"` | RESOLVED | 2026-03-01 | |
| Backlight offset — **RESOLVED: +4B in event index data**, uint8, seconds | RESOLVED | 2026-03-02 | |
| Power save offset — **RESOLVED: +53 in event index data**, uint8, minutes | RESOLVED | 2026-03-02 | |
@@ -1962,7 +1962,7 @@ The `.bin` files produced by `s3_bridge` are **not raw wire bytes**. The logger
| Trigger Level (Mic) | §3.8.6 | Channel block, float | float32 BE | 100148 dB in 1 dB steps |
| Alarm Level (Mic) | §3.9.10 | Channel block, float | float32 BE | higher than mic trigger |
| Record Time | §3.8.9 | cfg anchor+10, float32 BE (wire); `.set` +16, uint32 LE (file) | float32 BE (wire) | 1105 s; confirmed 3→`40400000`, 5→`40A00000`, 8→`41000000`, 13→`41500000`. Use anchor §7.6.1/§7.6.3 — NOT fixed offset. |
| Max Geo Range | §3.8.4 | Channel block, float | float32 BE | 1.25 or 10.0 in/s (user); 6.2061 in protocol = internal constant |
| Max Geo Range | §3.8.4 | Channel block, float | float32 BE | ❓ UNKNOWN — value 6.2061 confirmed at offset, but meaning/units unresolved. Does NOT equal 1.25 or 10.0 in/s. Do NOT use as ADC full-scale. |
| Microphone Units | §3.9.7 | Inline unit string | char[4] | `"psi\0"`, `"pa.\0"`, `"dB\0\0"` |
| Sample Rate | §3.8.2 | cfg anchor2, uint16 BE — anchor=`\x01\x2c\x00\x00\xbe\x80\x00\x00\x00\x00` in cfg[40:100] | uint16 BE | Normal=1024, Fast=2048, Faster=4096 ✅ CONFIRMED 2026-04-01 (BE11529 S338.17). Anchor required — see §7.6.3 DLE jitter. |
| Record Mode | §3.8.1 | Unknown | — | Single Shot, Continuous, Manual, Histogram, Histogram Combo |
+1
View File
@@ -11,6 +11,7 @@ dependencies = [
"fastapi>=0.104",
"uvicorn[standard]>=0.24",
"pyserial>=3.5",
"sqlalchemy>=2.0",
]
[tool.setuptools.packages.find]
+4
View File
@@ -0,0 +1,4 @@
fastapi
uvicorn
sqlalchemy
pyserial
+376
View File
@@ -0,0 +1,376 @@
"""
sfm/cache.py Persistent SQLite cache for SFM device data.
Caching strategy
----------------
+------------------+----------------------------------+-------------------------+
| Data | Mutability | Invalidation |
+------------------+----------------------------------+-------------------------+
| Device info | Effectively immutable (firmware, | Manual clear / force |
| (serial, model, | serial never change) | refresh query param |
| compliance cfg) | | |
+------------------+----------------------------------+-------------------------+
| Event headers | Append-only (new events added, | Fetch new ones when |
| (peaks, ts, | old never modified) | device event count > |
| project info) | | cached count |
+------------------+----------------------------------+-------------------------+
| Full waveforms | Immutable once recorded | Never (permanent cache) |
| (raw ADC samples)| | |
+------------------+----------------------------------+-------------------------+
| Monitor status | Frequently changing | TTL = 30 seconds |
| (battery, memory)| | |
+------------------+----------------------------------+-------------------------+
Keys
----
All cached rows are keyed by (host, tcp_port) for TCP connections, or (port, baud)
for serial connections. Within a device, events are keyed by index (0-based).
The device serial number is stored once we learn it, and used for display / debugging
only the network address is the primary routing key (same as how the rest of the SFM
code operates).
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import Optional
try:
import sqlalchemy as sa
from sqlalchemy import orm
except ImportError:
raise ImportError(
"sqlalchemy is required for the SFM cache.\n"
"Install it with: pip install sqlalchemy"
)
log = logging.getLogger("sfm.cache")
# ── Schema ────────────────────────────────────────────────────────────────────
Base = orm.declarative_base()
_MONITOR_STATUS_TTL = 30 # seconds
class CachedDevice(Base):
"""
Device identity + compliance config, keyed by connection address.
Stores the full serialised JSON blob returned by /device/info so the
endpoint can return it verbatim on a cache hit without re-connecting.
"""
__tablename__ = "cached_devices"
# Connection key — either TCP (host+port) or serial (port+baud)
conn_key = sa.Column(sa.String, primary_key=True) # e.g. "tcp:1.2.3.4:12345"
serial = sa.Column(sa.String, nullable=True) # e.g. "BE11529"
info_json = sa.Column(sa.Text, nullable=False) # full /device/info response JSON
updated_at = sa.Column(sa.Float, nullable=False) # Unix timestamp of last write
# When a config write happens we set this flag so the next /device/info call
# fetches fresh data instead of serving stale compliance config.
config_dirty = sa.Column(sa.Boolean, default=False, nullable=False)
class CachedEvent(Base):
"""
Per-event header + peak values + project info, keyed by (conn_key, index).
Events are immutable once recorded on the device; once we have an event in
the cache it never needs to be re-downloaded unless explicitly requested.
"""
__tablename__ = "cached_events"
conn_key = sa.Column(sa.String, primary_key=True)
index = sa.Column(sa.Integer, primary_key=True)
event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict
cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp
class CachedWaveform(Base):
"""
Full raw ADC waveform for a single event (SUB 5A full download).
These are large (up to several MB) and expensive to fetch over cellular.
Once downloaded they are immutable and cached permanently.
"""
__tablename__ = "cached_waveforms"
conn_key = sa.Column(sa.String, primary_key=True)
index = sa.Column(sa.Integer, primary_key=True)
waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON
cached_at = sa.Column(sa.Float, nullable=False)
class CachedMonitorStatus(Base):
"""
Monitor status (battery, memory, is_monitoring) with a short TTL.
These change frequently during field operations so we keep them only for
MONITOR_STATUS_TTL seconds before re-fetching from the device.
"""
__tablename__ = "cached_monitor_status"
conn_key = sa.Column(sa.String, primary_key=True)
status_json = sa.Column(sa.Text, nullable=False)
cached_at = sa.Column(sa.Float, nullable=False)
# ── Cache store ───────────────────────────────────────────────────────────────
class SFMCache:
"""
SQLite-backed cache for SFM device data.
Usage
-----
cache = SFMCache() # stores in sfm/data/sfm_cache.db by default
cache = SFMCache(":memory:") # in-memory (tests / ephemeral mode)
All public methods accept a *conn_key* string use make_conn_key() to
build a consistent key from the transport parameters.
"""
def __init__(self, db_path: str | Path | None = None) -> None:
in_memory = (db_path == ":memory:")
if db_path is None:
# Default: alongside this file in sfm/data/
db_path = Path(__file__).parent / "data" / "sfm_cache.db"
if not in_memory:
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
url = "sqlite:///:memory:" if in_memory else f"sqlite:///{db_path}"
engine = sa.create_engine(url, connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
self._Session = orm.sessionmaker(bind=engine)
log.info("SFM cache opened: %s", db_path)
# ── Connection key ────────────────────────────────────────────────────────
@staticmethod
def make_conn_key(
host: Optional[str],
tcp_port: int,
port: Optional[str],
baud: int,
) -> str:
"""Return a stable string key for this transport configuration."""
if host:
return f"tcp:{host}:{tcp_port}"
return f"serial:{port}:{baud}"
# ── Device info ───────────────────────────────────────────────────────────
def get_device_info(self, conn_key: str) -> Optional[dict]:
"""
Return cached device info dict, or None if not cached / config_dirty.
"""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
if row is None or row.config_dirty:
return None
return json.loads(row.info_json)
def set_device_info(self, conn_key: str, info: dict) -> None:
"""Store device info and clear any dirty flag."""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
serial = info.get("serial")
if row is None:
row = CachedDevice(
conn_key=conn_key,
serial=serial,
info_json=json.dumps(info),
updated_at=time.time(),
config_dirty=False,
)
s.add(row)
else:
row.serial = serial
row.info_json = json.dumps(info)
row.updated_at = time.time()
row.config_dirty = False
s.commit()
log.debug("cached device info for %s (serial=%s)", conn_key, serial)
def mark_config_dirty(self, conn_key: str) -> None:
"""
Called after a successful POST /device/config write.
Forces the next /device/info call to re-read compliance config from the
device instead of serving the now-stale cached version.
"""
with self._Session() as s:
row = s.get(CachedDevice, conn_key)
if row:
row.config_dirty = True
s.commit()
log.debug("marked config dirty for %s", conn_key)
# ── Events ────────────────────────────────────────────────────────────────
def get_cached_event_count(self, conn_key: str) -> int:
"""Return the number of events we have cached for this device."""
with self._Session() as s:
return s.query(CachedEvent).filter_by(conn_key=conn_key).count()
def get_all_events(self, conn_key: str) -> Optional[list[dict]]:
"""
Return all cached events as a list of dicts, sorted by index.
Returns None if nothing is cached yet.
"""
with self._Session() as s:
rows = (
s.query(CachedEvent)
.filter_by(conn_key=conn_key)
.order_by(CachedEvent.index)
.all()
)
if not rows:
return None
return [json.loads(r.event_json) for r in rows]
def get_event(self, conn_key: str, index: int) -> Optional[dict]:
"""Return a single cached event by index, or None if not cached."""
with self._Session() as s:
row = s.get(CachedEvent, (conn_key, index))
return json.loads(row.event_json) if row else None
def set_events(self, conn_key: str, events: list[dict]) -> None:
"""
Upsert a list of event dicts. Existing rows are updated; new rows are
inserted. This is used to add newly-discovered events to the cache.
"""
now = time.time()
with self._Session() as s:
for ev in events:
idx = ev["index"]
row = s.get(CachedEvent, (conn_key, idx))
if row is None:
row = CachedEvent(
conn_key=conn_key,
index=idx,
event_json=json.dumps(ev),
cached_at=now,
)
s.add(row)
log.debug("cached new event %d for %s", idx, conn_key)
else:
# Refresh in case project_info was backfilled after initial store
row.event_json = json.dumps(ev)
s.commit()
# ── Waveforms ─────────────────────────────────────────────────────────────
def get_waveform(self, conn_key: str, index: int) -> Optional[dict]:
"""Return a cached full waveform response dict, or None if not cached."""
with self._Session() as s:
row = s.get(CachedWaveform, (conn_key, index))
if row is None:
return None
log.debug("waveform cache hit: %s event %d", conn_key, index)
return json.loads(row.waveform_json)
def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None:
"""Store a full waveform response dict permanently."""
with self._Session() as s:
row = s.get(CachedWaveform, (conn_key, index))
if row is None:
row = CachedWaveform(
conn_key=conn_key,
index=index,
waveform_json=json.dumps(waveform),
cached_at=time.time(),
)
s.add(row)
else:
row.waveform_json = json.dumps(waveform)
row.cached_at = time.time()
s.commit()
log.debug("cached waveform for %s event %d", conn_key, index)
# ── Monitor status ────────────────────────────────────────────────────────
def get_monitor_status(self, conn_key: str) -> Optional[dict]:
"""Return cached monitor status if it's within TTL, else None."""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row is None:
return None
age = time.time() - row.cached_at
if age > _MONITOR_STATUS_TTL:
log.debug("monitor status expired (age=%.1fs) for %s", age, conn_key)
return None
return json.loads(row.status_json)
def set_monitor_status(self, conn_key: str, status: dict) -> None:
"""Store monitor status."""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row is None:
row = CachedMonitorStatus(
conn_key=conn_key,
status_json=json.dumps(status),
cached_at=time.time(),
)
s.add(row)
else:
row.status_json = json.dumps(status)
row.cached_at = time.time()
s.commit()
def invalidate_monitor_status(self, conn_key: str) -> None:
"""
Called after start/stop monitoring so the next status poll re-reads from device.
"""
with self._Session() as s:
row = s.get(CachedMonitorStatus, conn_key)
if row:
s.delete(row)
s.commit()
# ── Cache management ──────────────────────────────────────────────────────
def clear_device(self, conn_key: str) -> dict:
"""
Remove all cached data for a device. Returns counts of deleted rows.
"""
counts = {}
with self._Session() as s:
counts["device_info"] = s.query(CachedDevice).filter_by(conn_key=conn_key).delete()
counts["events"] = s.query(CachedEvent).filter_by(conn_key=conn_key).delete()
counts["waveforms"] = s.query(CachedWaveform).filter_by(conn_key=conn_key).delete()
counts["monitor_status"] = s.query(CachedMonitorStatus).filter_by(conn_key=conn_key).delete()
s.commit()
log.info("cleared cache for %s: %s", conn_key, counts)
return counts
def stats(self) -> dict:
"""Return row counts for all cache tables (for /cache/stats endpoint)."""
with self._Session() as s:
return {
"devices": s.query(CachedDevice).count(),
"events": s.query(CachedEvent).count(),
"waveforms": s.query(CachedWaveform).count(),
"monitor_status": s.query(CachedMonitorStatus).count(),
}
# ── Module-level singleton ────────────────────────────────────────────────────
# Instantiated once when the module is imported; shared across all requests.
_cache: Optional[SFMCache] = None
def get_cache() -> SFMCache:
"""Return the module-level cache singleton, initialising it on first call."""
global _cache
if _cache is None:
_cache = SFMCache()
return _cache
+237 -79
View File
@@ -61,6 +61,7 @@ from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from sfm.cache import SFMCache, get_cache
from sfm.database import SeismoDb
logging.basicConfig(
@@ -388,6 +389,33 @@ def _run_with_retry(fn, *, is_tcp: bool):
return fn() # let any second failure propagate normally
# ── Helpers ────────────────────────────────────────────────────────────────────
def _backfill_events(events: list, info: "DeviceInfo") -> None:
"""
Fill in sample_rate and project_info fields that the per-event waveform
record doesn't carry — sourced from the device's compliance config.
Extracted from device_events() so it can be called from both the full
download path and the partial (new-events-only) path.
"""
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
# ── Endpoints ──────────────────────────────────────────────────────────────────
@app.get("/health")
@@ -414,7 +442,7 @@ def device_info(
baud: int = Query(38400, description="Serial baud rate (default 38400)"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay (e.g. 203.0.113.5)"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
force: bool = Query(False, description="Bypass cache and re-read from device"),
) -> dict:
"""
Connect to the device, perform the POLL startup handshake, and return
@@ -423,16 +451,21 @@ def device_info(
Supply either *port* (serial) or *host* (TCP/modem).
Equivalent to POST /device/connect provided as GET for convenience.
Response is cached until a POST /device/config write invalidates it.
Pass ?force=true to bypass the cache.
**Caching**: device identity and compliance config are cached after the first
successful read (they rarely change). Pass *force=true* to bypass the cache
and re-read directly from the device (e.g. after a config push).
The cache is also automatically invalidated after POST /device/config.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/info port=%s host=%s tcp_port=%d force=%s", port, host, tcp_port, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = _live_cache.get_device_info(conn_key)
cached = cache.get_device_info(conn_key)
if cached is not None:
log.debug("device_info cache hit for %s", conn_key)
log.info("device info cache hit for %s", conn_key)
cached["_cached"] = True
return cached
try:
@@ -454,7 +487,7 @@ def device_info(
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
result = _serialise_device_info(info)
_live_cache.set_device_info(conn_key, result)
cache.set_device_info(conn_key, result)
return result
@@ -478,8 +511,8 @@ def device_events(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
debug: bool = Query(False, description="Include raw record hex for field-layout inspection"),
force: bool = Query(False, description="Bypass cache and re-download all events from device"),
) -> dict:
"""
Connect to the device, read the event index, and download all stored
@@ -497,38 +530,107 @@ def device_events(
This does NOT download raw ADC waveform samples those are large and
fetched separately via GET /device/event/{idx}/waveform.
**Caching**: event headers are cached after the first download. On subsequent
calls, the device is contacted only to check the event count (fast: ~2s).
If the count matches the cache, all events are returned from cache instantly.
If new events exist on the device, only the new ones are downloaded and merged.
Pass *force=true* to bypass the cache entirely and re-download everything.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/events port=%s host=%s debug=%s force=%s", port, host, debug, force)
# ── Cache fast path ───────────────────────────────────────────────────────
# Do a quick poll + count_events() probe (~2s over cellular) to check if the
# device has any new events. If the count matches the cache, return early.
if not force and not debug:
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
try:
client.poll()
except Exception:
client.poll()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
cached_events = _live_cache.get_events(conn_key, device_count)
if cached_events is not None:
log.info(" events cache hit (%d events, count=%d)", len(cached_events), device_count)
# Also serve cached device info if available
cached_info = _live_cache.get_device_info(conn_key)
return {
"device": cached_info or {},
"event_count": len(cached_events),
"events": cached_events,
"cached": True,
}
except Exception as exc:
log.warning(" count probe failed (%s) — falling through to full download", exc)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
# ── Full download ─────────────────────────────────────────────────────────
# ── Smart cache path (skip when debug=True or force=True) ────────────────
# debug mode uses raw_record_hex which isn't stored in the cache, so we
# must always go to the device when debug is requested.
if not force and not debug:
cached_events = cache.get_all_events(conn_key)
cached_count = len(cached_events) if cached_events else 0
if cached_count > 0:
# Quick device contact: just count events via the fast 1E/1F chain.
# This takes ~2s instead of the full event download (~10-30s).
try:
def _count():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.count_events()
device_count = _run_with_retry(_count, is_tcp=_is_tcp(host))
except HTTPException:
raise
except (ProtocolError, OSError, Exception) as exc:
# If we can't reach the device at all, serve stale cache rather
# than returning an error — field units go offline regularly.
log.warning("count_events failed (%s) — serving stale cache for %s", exc, conn_key)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
"_stale": True,
}
if device_count == cached_count:
# Nothing new — return cache immediately, no event download needed.
log.info(
"event cache hit for %s: %d events, device count matches",
conn_key, cached_count,
)
cached_info = cache.get_device_info(conn_key) or {}
return {
"device": cached_info,
"event_count": cached_count,
"events": cached_events,
"_cached": True,
}
if device_count > cached_count:
# New events on the device — download all events but only store/return
# the new ones. Events are append-only; indices 0..(cached_count-1)
# are already in the cache and don't need to be re-downloaded logically,
# but the protocol requires iterating from event 0 to reach later ones.
# The device download time is dominated by the number of events requested,
# so we stop at the last known event index to avoid re-downloading everything.
log.info(
"new events on device %s: have %d, device has %d — fetching all up to %d",
conn_key, cached_count, device_count, device_count - 1,
)
try:
def _fetch_new():
with _build_client(port, baud, host, tcp_port) as client:
info = client.connect()
all_evs = client.get_events(stop_after_index=device_count - 1)
return info, all_evs
info, all_events = _run_with_retry(_fetch_new, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
_backfill_events(all_events, info)
# Only the new events (indices >= cached_count) are truly new.
new_events = [ev for ev in all_events if ev.index >= cached_count]
new_serialised = [_serialise_event(ev) for ev in new_events]
cache.set_events(conn_key, new_serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
merged_events = cache.get_all_events(conn_key)
return {
"device": _serialise_device_info(info),
"event_count": len(merged_events),
"events": merged_events,
"_cached": True,
"_new_events": len(new_events),
}
# ── Full download path (first call, force=True, or debug=True) ───────────
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
@@ -543,23 +645,14 @@ def device_events(
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Fill sample_rate from compliance config where the event record doesn't supply it.
if info.compliance_config and info.compliance_config.sample_rate:
for ev in events:
if ev.sample_rate is None:
ev.sample_rate = info.compliance_config.sample_rate
_backfill_events(events, info)
serialised = [_serialise_event(ev, debug=debug) for ev in events]
# Backfill event.project_info fields that the 210-byte waveform record doesn't carry.
if info.compliance_config:
cc = info.compliance_config
for ev in events:
if ev.project_info is None:
ev.project_info = ProjectInfo()
pi = ev.project_info
if pi.client is None: pi.client = cc.client
if pi.operator is None: pi.operator = cc.operator
if pi.sensor_location is None: pi.sensor_location = cc.sensor_location
if pi.notes is None: pi.notes = cc.notes
if not debug:
# Only cache when not in debug mode (debug adds raw_record_hex which
# we don't want polluting the normal cache entries).
cache.set_events(conn_key, serialised)
cache.set_device_info(conn_key, _serialise_device_info(info))
serialised_info = _serialise_device_info(info)
serialised_events = [_serialise_event(ev, debug=debug) for ev in events]
@@ -572,8 +665,7 @@ def device_events(
return {
"device": serialised_info,
"event_count": len(events),
"events": serialised_events,
"cached": False,
"events": serialised,
}
@@ -584,21 +676,36 @@ def device_event(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download a single event by index (0-based).
Supply either *port* (serial) or *host* (TCP/modem).
Performs: POLL startup event index event header waveform record.
**Caching**: if this event was already downloaded (e.g. via GET /device/events),
it is returned instantly from cache with no device contact.
"""
log.info("GET /device/event/%d port=%s host=%s", index, port, host)
log.info("GET /device/event/%d port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = cache.get_event(conn_key, index)
if cached is not None:
log.info("event cache hit for %s index %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
return client.get_events(stop_after_index=index)
events = _run_with_retry(_do, is_tcp=_is_tcp(host))
info = client.connect()
events = client.get_events(stop_after_index=index)
return info, events
info, events = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
@@ -615,7 +722,14 @@ def device_event(
detail=f"Event index {index} not found on device",
)
return _serialise_event(matching[0])
_backfill_events(matching, info)
result = _serialise_event(matching[0])
# Store all downloaded events (we paid for them anyway — indices 0..index)
all_serialised = [_serialise_event(ev) for ev in events]
cache.set_events(conn_key, all_serialised)
return result
@app.get("/device/event/{index}/waveform")
@@ -625,7 +739,7 @@ def device_event_waveform(
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
force: bool = Query(False, description="Bypass cache and re-download waveform"),
force: bool = Query(False, description="Bypass cache and re-download from device"),
) -> dict:
"""
Download the full raw ADC waveform for a single event (0-based index).
@@ -645,17 +759,22 @@ def device_event_waveform(
- **channels**: dict of channel name list of signed int16 ADC counts
(keys: "Tran", "Vert", "Long", "Mic")
Waveforms are immutable once recorded and are cached permanently per
(connection, event index). Pass ?force=true to re-download.
**Caching**: full waveforms are cached permanently after the first download
they are immutable once recorded on the device. Subsequent requests for the
same event return instantly from cache without any device contact.
Pass *force=true* to force a fresh download (rarely needed).
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
log.info("GET /device/event/%d/waveform port=%s host=%s force=%s", index, port, host, force)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached_waveform = _live_cache.get_waveform(conn_key, index)
if cached_waveform is not None:
log.debug("waveform cache hit: %s event %d", conn_key, index)
return cached_waveform
cached = cache.get_waveform(conn_key, index)
if cached is not None:
log.info("waveform cache hit for %s event %d", conn_key, index)
cached["_cached"] = True
return cached
try:
def _do():
@@ -701,7 +820,7 @@ def device_event_waveform(
"peak_values": _serialise_peak_values(ev.peak_values),
"channels": raw,
}
_live_cache.set_waveform(conn_key, index, result)
cache.set_waveform(conn_key, index, result)
return result
@@ -818,9 +937,9 @@ def device_config(
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
# Config was written — invalidate cached device info and events so the next
# /device/info or /device/events call re-reads fresh data from the device.
_live_cache.mark_config_dirty(conn_key)
# Config was written to the device — the cached compliance config is now stale.
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().mark_config_dirty(conn_key)
return {
"status": "ok",
@@ -862,14 +981,17 @@ def device_monitor_status(
Returns is_monitoring bool, battery voltage, and memory usage (total + free
bytes). Battery and memory are only present when the unit is idle.
**Caching:** response is cached for 30 seconds. Pass ?force=true to bypass.
**Caching**: status is cached for 30 seconds to reduce cellular polling overhead.
Pass *force=true* to bypass the cache for an immediate fresh read.
"""
conn_key = _live_cache.make_conn_key(host, tcp_port, port, baud)
cache = get_cache()
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
if not force:
cached = _live_cache.get_monitor_status(conn_key)
cached = cache.get_monitor_status(conn_key)
if cached is not None:
log.debug("monitor_status cache hit for %s", conn_key)
log.debug("monitor status cache hit for %s", conn_key)
cached["_cached"] = True
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
@@ -890,7 +1012,7 @@ def device_monitor_status(
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
_live_cache.set_monitor_status(conn_key, result)
cache.set_monitor_status(conn_key, result)
return result
@@ -914,7 +1036,9 @@ def device_monitor_start(
log.warning("start monitoring poll retry: %s", exc)
client.poll()
client.start_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "started"}
@@ -938,10 +1062,44 @@ def device_monitor_stop(
log.warning("stop monitoring poll retry: %s", exc)
client.poll()
client.stop_monitoring()
_live_cache.invalidate_monitor_status(conn_key)
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
get_cache().invalidate_monitor_status(conn_key)
return {"status": "stopped"}
# ── Cache management endpoints ────────────────────────────────────────────────
@app.get("/cache/stats")
def cache_stats() -> dict:
"""
Return row counts for all cache tables.
Useful for debugging and verifying that caching is working as expected.
"""
return get_cache().stats()
@app.delete("/cache/device")
def cache_clear_device(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Clear all cached data for a specific device (identified by its connection address).
Clears: device info, all event headers, all waveforms, monitor status.
The next request to any endpoint for this device will re-fetch from the device.
Supply either *port* (serial) or *host* (TCP/modem) to identify the device.
"""
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
counts = get_cache().clear_device(conn_key)
return {"status": "cleared", "conn_key": conn_key, "deleted": counts}
# ── DB read endpoints ─────────────────────────────────────────────────────────
#
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.