feat(cache): implement integrity checks for cached events and waveforms

- Added `waveform_key` and `event_timestamp` columns to `CachedEvent` and `CachedWaveform` for integrity verification.
- Implemented logic to flush the cache when a mismatch in (waveform_key, event_timestamp) is detected during event and waveform updates.
- Enhanced `set_events` and `set_waveform` methods to check for mismatches and trigger cache eviction as necessary.
- Introduced a new `LiveCache` class to manage in-memory caching of live device data, separating it from the server logic for better testability.
- Added tests to verify the correctness of cache invalidation logic, particularly for post-erase key reuse scenarios.
- Updated web application to include a "Force refresh" toggle, allowing users to bypass the cache and re-fetch data from the device.
This commit is contained in:
2026-05-07 04:42:00 +00:00
parent 0484680c89
commit 9afa3484f4
7 changed files with 890 additions and 323 deletions
+169 -114
View File
@@ -74,39 +74,73 @@ from sfm.waveform_store import WaveformStore
log = logging.getLogger("ach_server")
# ── Per-unit state (downloaded-key set) ───────────────────────────────────────
# ── Per-unit state (downloaded events index) ──────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format:
# Format (current — v2):
# {
# "BE11529": {
# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk
# "max_downloaded_key": "0111245a", # highest key ever seen
# "last_seen": "2026-04-11T01:04:36"
# "downloaded_events": { # key_hex → ISO timestamp string
# "01110000": "2026-04-11T00:42:17",
# "0111245a": "2026-04-11T01:04:30"
# },
# "max_downloaded_key": "0111245a",
# "last_seen": "2026-04-11T01:04:36",
# "serial": "BE11529",
# "peer": "63.43.212.232:51920"
# }
# }
#
# Key-based deduplication works well within a single "key generation" (between
# erases). After the device memory is erased the event counter resets to
# 0x01110000, so the first new event has the SAME key as the very first event
# we ever downloaded. We detect this situation with max_downloaded_key:
# Why (key, timestamp) and not key alone:
# The device's event-key counter resets to 0x01110000 after every memory
# erase (internal or external). A bare-key dedup (the v1 format) cannot
# distinguish a re-recorded event with the same key from one we already
# downloaded. The 0C waveform record's timestamp IS unique per physical
# event, so we pair (key, timestamp) and treat a key with a different
# timestamp as a new event regardless of `max_downloaded_key`.
#
# if max(current_device_keys) < max_downloaded_key
# → device was wiped and keys have restarted → treat all device keys as new
#
# After our own erase (--clear-after-download) we also explicitly clear
# downloaded_keys and max_downloaded_key so the next session starts fresh.
# Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on
# read: the keys are kept under a sentinel of "" (empty string) timestamp so
# the (key, timestamp) compare always sees a mismatch and forces a one-time
# re-download. After that pass the state is rewritten in v2 form.
_state_lock = threading.Lock()
def _load_state(state_path: Path) -> dict:
if state_path.exists():
try:
with open(state_path) as f:
return json.load(f)
except Exception:
pass
return {}
"""
Load ach_state.json, transparently migrating any legacy
`downloaded_keys: list` entries into the v2 `downloaded_events: dict`
schema. Returns the migrated state.
"""
if not state_path.exists():
return {}
try:
with open(state_path) as f:
state = json.load(f)
except Exception:
return {}
# Per-unit migration: legacy list → dict-with-empty-timestamps
for unit_key, unit_state in list(state.items()):
if not isinstance(unit_state, dict):
continue
if "downloaded_events" in unit_state:
continue
legacy_keys = unit_state.get("downloaded_keys")
if isinstance(legacy_keys, list):
unit_state["downloaded_events"] = {k: "" for k in legacy_keys}
log.info(
"ach_state: migrated %s from v1 (downloaded_keys list) → v2 "
"(downloaded_events dict, %d keys with empty timestamps; "
"they will re-validate on next session)",
unit_key, len(legacy_keys),
)
else:
unit_state["downloaded_events"] = {}
# keep legacy field for one cycle; cleared on next save
unit_state.pop("downloaded_keys", None)
return state
def _save_state(state_path: Path, state: dict) -> None:
@@ -143,6 +177,7 @@ class AchSession:
store: "WaveformStore",
clear_after_download: bool = False,
restart_monitoring: bool = False,
force_redownload: bool = False,
) -> None:
self.sock = sock
self.peer = peer
@@ -155,6 +190,11 @@ class AchSession:
self.store = store
self.clear_after_download = clear_after_download
self.restart_monitoring = restart_monitoring
# `force_redownload` tells this session to ignore ach_state and
# re-download every event currently on the device, regardless of any
# (key, timestamp) match. Useful as a manual override when state has
# become inconsistent with what's actually on disk / in the DB.
self.force_redownload = force_redownload
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -276,11 +316,20 @@ class AchSession:
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
unit_state = state.get(unit_key, {})
seen_keys: set[str] = set(unit_state.get("downloaded_keys", []))
# Highest event key ever downloaded from this unit (hex string, 8 chars).
# Used to detect post-erase key reuse — see comment block above.
# downloaded_events is the v2 (key_hex → timestamp_iso) dict.
# Empty-string timestamps are migrated v1 entries — they force a
# one-time re-download because the (key, timestamp) compare always
# mismatches against any non-empty timestamp from a fresh 0C read.
seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {}))
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
if self.force_redownload:
log.info(" --force-redownload-all set — ignoring %d cached "
"(key, timestamp) entries for this session",
len(seen_events))
seen_events = {}
# Walk the event index (browse-mode, no 5A) to get the actual current
# key list. The SUB 08 event_count field is a lifetime "total events
# ever recorded" counter that does NOT decrement on erase — confirmed
@@ -293,11 +342,10 @@ class AchSession:
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
device_keys = None
# Use the walk result as our authoritative current count.
current_count = len(device_keys) if device_keys is not None else 0
log.info(" Unit has %d stored event(s); %d key(s) previously downloaded",
current_count, len(seen_keys))
log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded",
current_count, len(seen_events))
if device_keys is not None and current_count == 0:
log.info(" [OK] No events on device -- nothing to download")
@@ -305,75 +353,29 @@ class AchSession:
return
if device_keys is not None:
# ── Post-erase detection ──────────────────────────────────────
# After the device memory is erased, new events start from key
# 01110000 again — the same keys we already downloaded. Detect
# this by comparing the device's current highest key against the
# historical maximum. If the device has rolled back below our
# high-water mark, its counter was reset and we must treat all
# its keys as new, regardless of what seen_keys contains.
# ── Post-erase detection (best-effort, key-only signal) ───────
# After erase the device's key counter resets to 01110000.
# If the device's current max key is below our high-water mark
# we know erase happened. This catches the cleanest case but
# does NOT catch erase-then-record-many-events (where the new
# max may climb past the old max). The (key, timestamp) check
# in get_events() is what handles those.
if device_keys and max_seen_key != "00000000":
max_device_key = max(device_keys) # lexicographic; safe because
# keys share the same 4-char prefix
max_device_key = max(device_keys)
if max_device_key < max_seen_key:
log.info(
" Post-erase reset detected: "
"device max key %s < historical max %s "
"-- treating all device keys as new",
"-- discarding stale (key, ts) state for this session",
max_device_key, max_seen_key,
)
seen_keys = set() # discard stale dedup info for this session
seen_events = {}
new_key_set = set(device_keys) - seen_keys
log.info(" Device has %d key(s): %d new, %d already seen",
len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set))
if not new_key_set:
log.info(" [OK] All events already downloaded -- nothing to do")
# Refresh state timestamp; preserve max_seen_key unchanged.
state[unit_key] = {
"downloaded_keys": sorted(seen_keys | set(device_keys)),
"max_downloaded_key": max_seen_key,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
# ── Erase even when no new events (if requested) ──────────
# Blastware ACH always erases after every session — even when
# nothing new was downloaded. Without the erase the device
# still sees stored events in its memory and immediately
# retries the call-home, causing the looping we observed.
# Only erase when device actually has events stored; skip
# the erase if device_keys is empty (nothing to erase).
if self.clear_after_download and device_keys:
log.info(
" Clearing device memory (--clear-after-download, "
"no new events but device has %d stored)...",
len(device_keys),
)
try:
client.delete_all_events()
log.info(" [OK] Device memory cleared")
# Reset state so the next session starts fresh.
state[unit_key] = {
"downloaded_keys": [],
"max_downloaded_key": "00000000",
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
except Exception as exc:
log.error(
" [WARN] Event deletion failed: %s -- events NOT cleared",
exc,
)
log.info("Session complete (no new events) -> %s", session_dir)
return
else:
new_key_set = None # unknown; proceed with full download
# Note: no early-exit "all already downloaded" short-circuit
# here. Without per-event timestamps we cannot tell whether
# device_keys ⊆ seen_events.keys() actually means we have
# those physical events. get_events() will read 0C on its
# skip path and decide per event.
# Apply max_events cap
# stop_idx: when we know the count from list_event_keys, use it as
@@ -391,24 +393,35 @@ class AchSession:
)
try:
# Pass `seen_events` (key → ISO timestamp) so the client can
# read 0C on its skip path and only skip 5A when the per-event
# timestamp matches what we already have on disk. When force_-
# redownload is set, seen_events was already cleared above.
#
# Filter out empty-string timestamps (legacy v1 entries) — the
# client's 0C-on-skip-path only trusts entries with a
# populated timestamp; otherwise it falls through to a full
# 5A download.
skip_dict = {k: ts for k, ts in seen_events.items() if ts}
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
skip_waveform_for_keys=seen_keys if seen_keys else None,
skip_waveform_for_events=skip_dict if skip_dict else None,
)
# Filter to events whose keys we haven't saved before.
# New events are those that came back with _a5_frames populated
# (= 5A actually ran on this session). Skipped events have
# _a5_frames = None because the client matched (key, timestamp)
# against skip_dict and bypassed 5A.
new_events = [
e for e in all_events
if e._waveform_key is None
or e._waveform_key.hex() not in seen_keys
if getattr(e, "_a5_frames", None)
]
skipped = len(all_events) - len(new_events)
log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)",
log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)",
len(all_events), len(new_events), skipped)
if skipped:
log.info(" (skipped %d already-downloaded event(s))", skipped)
# ── Persist event file + A5 sidecar to the waveform store ──
# Saves ride alongside the existing JSON dump so the on-disk
@@ -537,35 +550,64 @@ class AchSession:
)
# ── Update persistent state ───────────────────────────────────
# Include both triggered-event keys and monitor-log keys in the
# downloaded set so they are not re-processed on the next call-home.
current_event_keys = [
e._waveform_key.hex()
for e in all_events
if e._waveform_key is not None
]
current_monitor_keys = [e.key for e in new_monitor_entries]
current_keys = current_event_keys + current_monitor_keys
# Build a fresh (key → ISO timestamp) map from THIS session's
# results. For each event currently on the device, prefer the
# timestamp we just observed (from 0C); fall back to whatever
# was already in seen_events for that key (so we don't lose an
# entry just because get_events skipped it on the (key, ts)
# match path).
def _ts_iso(ev) -> str:
ts = getattr(ev, "timestamp", None)
if ts is None:
return ""
try:
return datetime.datetime(
ts.year, ts.month, ts.day,
ts.hour or 0, ts.minute or 0, ts.second or 0,
).isoformat()
except Exception:
return str(ts)
current_events_map: dict[str, str] = {}
for ev in all_events:
if ev._waveform_key is None:
continue
key_hex = ev._waveform_key.hex()
ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "")
current_events_map[key_hex] = ts_iso
# Monitor-log entries don't have a 0C-style timestamp, but
# they DO have a start_time; use that so the monitor-log keys
# are properly entered into the (key, ts) map.
for ml in new_monitor_entries:
key_hex = ml.key
ts = ml.start_time
ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "")
# If a triggered event already populated this key, keep
# whichever has a non-empty timestamp.
if key_hex not in current_events_map or not current_events_map[key_hex]:
current_events_map[key_hex] = ts_iso
if erased_successfully:
# Device memory is clear. Reset downloaded_keys and the
# high-water mark so the next call-home starts fresh and
# doesn't mis-identify the recycled key 01110000 as "seen".
updated_keys = []
updated_events: dict[str, str] = {}
new_max_key = "00000000"
log.info(
" State reset after erase -- next session will download "
"from key 0 (device counter resets after erase)"
)
else:
# Normal (no erase): union of previously-seen + all keys on
# device now. Includes already-seen survivors so we never
# re-download them if the device somehow keeps old records.
updated_keys = sorted(set(seen_keys) | set(current_keys))
new_max_key = updated_keys[-1] if updated_keys else max_seen_key
# Merge: keep prior (key, ts) entries we still have evidence
# of (for survivors of any partial failure), plus this
# session's authoritative (key, ts) pairs.
updated_events = dict(seen_events)
updated_events.update(current_events_map)
new_max_key = (
max(updated_events.keys())
if updated_events else max_seen_key
)
state[unit_key] = {
"downloaded_keys": updated_keys,
"downloaded_events": updated_events,
"max_downloaded_key": new_max_key,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
@@ -704,6 +746,7 @@ def serve(args: argparse.Namespace) -> None:
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}")
print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
@@ -744,6 +787,7 @@ def serve(args: argparse.Namespace) -> None:
store=store,
clear_after_download=args.clear_after_download,
restart_monitoring=args.restart_monitoring,
force_redownload=args.force_redownload_all,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
@@ -828,6 +872,17 @@ def parse_args() -> argparse.Namespace:
"This mirrors the standard Blastware ACH workflow."
),
)
p.add_argument(
"--force-redownload-all",
action="store_true",
default=False,
help=(
"Manual override: ignore ach_state.json's downloaded_events map "
"for this session and re-download every event currently on the "
"device, regardless of (key, timestamp) match. Useful when state "
"has become inconsistent with the on-disk waveform store / DB."
),
)
p.add_argument(
"--verbose", "-v",
action="store_true",