diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 209ec0e..c048d4c 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -74,39 +74,73 @@ from sfm.waveform_store import WaveformStore log = logging.getLogger("ach_server") -# ── Per-unit state (downloaded-key set) ─────────────────────────────────────── +# ── Per-unit state (downloaded events index) ────────────────────────────────── # Persisted as /ach_state.json -# Format: +# Format (current — v2): # { # "BE11529": { -# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk -# "max_downloaded_key": "0111245a", # highest key ever seen -# "last_seen": "2026-04-11T01:04:36" +# "downloaded_events": { # key_hex → ISO timestamp string +# "01110000": "2026-04-11T00:42:17", +# "0111245a": "2026-04-11T01:04:30" +# }, +# "max_downloaded_key": "0111245a", +# "last_seen": "2026-04-11T01:04:36", +# "serial": "BE11529", +# "peer": "63.43.212.232:51920" # } # } # -# Key-based deduplication works well within a single "key generation" (between -# erases). After the device memory is erased the event counter resets to -# 0x01110000, so the first new event has the SAME key as the very first event -# we ever downloaded. We detect this situation with max_downloaded_key: +# Why (key, timestamp) and not key alone: +# The device's event-key counter resets to 0x01110000 after every memory +# erase (internal or external). A bare-key dedup (the v1 format) cannot +# distinguish a re-recorded event with the same key from one we already +# downloaded. The 0C waveform record's timestamp IS unique per physical +# event, so we pair (key, timestamp) and treat a key with a different +# timestamp as a new event regardless of `max_downloaded_key`. # -# if max(current_device_keys) < max_downloaded_key -# → device was wiped and keys have restarted → treat all device keys as new -# -# After our own erase (--clear-after-download) we also explicitly clear -# downloaded_keys and max_downloaded_key so the next session starts fresh. +# Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on +# read: the keys are kept under a sentinel of "" (empty string) timestamp so +# the (key, timestamp) compare always sees a mismatch and forces a one-time +# re-download. After that pass the state is rewritten in v2 form. _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: - if state_path.exists(): - try: - with open(state_path) as f: - return json.load(f) - except Exception: - pass - return {} + """ + Load ach_state.json, transparently migrating any legacy + `downloaded_keys: list` entries into the v2 `downloaded_events: dict` + schema. Returns the migrated state. + """ + if not state_path.exists(): + return {} + try: + with open(state_path) as f: + state = json.load(f) + except Exception: + return {} + + # Per-unit migration: legacy list → dict-with-empty-timestamps + for unit_key, unit_state in list(state.items()): + if not isinstance(unit_state, dict): + continue + if "downloaded_events" in unit_state: + continue + legacy_keys = unit_state.get("downloaded_keys") + if isinstance(legacy_keys, list): + unit_state["downloaded_events"] = {k: "" for k in legacy_keys} + log.info( + "ach_state: migrated %s from v1 (downloaded_keys list) → v2 " + "(downloaded_events dict, %d keys with empty timestamps; " + "they will re-validate on next session)", + unit_key, len(legacy_keys), + ) + else: + unit_state["downloaded_events"] = {} + # keep legacy field for one cycle; cleared on next save + unit_state.pop("downloaded_keys", None) + + return state def _save_state(state_path: Path, state: dict) -> None: @@ -143,6 +177,7 @@ class AchSession: store: "WaveformStore", clear_after_download: bool = False, restart_monitoring: bool = False, + force_redownload: bool = False, ) -> None: self.sock = sock self.peer = peer @@ -155,6 +190,11 @@ class AchSession: self.store = store self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring + # `force_redownload` tells this session to ignore ach_state and + # re-download every event currently on the device, regardless of any + # (key, timestamp) match. Useful as a manual override when state has + # become inconsistent with what's actually on disk / in the DB. + self.force_redownload = force_redownload def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -276,11 +316,20 @@ class AchSession: state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial unit_state = state.get(unit_key, {}) - seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) - # Highest event key ever downloaded from this unit (hex string, 8 chars). - # Used to detect post-erase key reuse — see comment block above. + + # downloaded_events is the v2 (key_hex → timestamp_iso) dict. + # Empty-string timestamps are migrated v1 entries — they force a + # one-time re-download because the (key, timestamp) compare always + # mismatches against any non-empty timestamp from a fresh 0C read. + seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {})) max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") + if self.force_redownload: + log.info(" --force-redownload-all set — ignoring %d cached " + "(key, timestamp) entries for this session", + len(seen_events)) + seen_events = {} + # Walk the event index (browse-mode, no 5A) to get the actual current # key list. The SUB 08 event_count field is a lifetime "total events # ever recorded" counter that does NOT decrement on erase — confirmed @@ -293,11 +342,10 @@ class AchSession: log.warning(" list_event_keys failed: %s -- falling back to full download", exc) device_keys = None - # Use the walk result as our authoritative current count. current_count = len(device_keys) if device_keys is not None else 0 - log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", - current_count, len(seen_keys)) + log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded", + current_count, len(seen_events)) if device_keys is not None and current_count == 0: log.info(" [OK] No events on device -- nothing to download") @@ -305,75 +353,29 @@ class AchSession: return if device_keys is not None: - # ── Post-erase detection ────────────────────────────────────── - # After the device memory is erased, new events start from key - # 01110000 again — the same keys we already downloaded. Detect - # this by comparing the device's current highest key against the - # historical maximum. If the device has rolled back below our - # high-water mark, its counter was reset and we must treat all - # its keys as new, regardless of what seen_keys contains. + # ── Post-erase detection (best-effort, key-only signal) ─────── + # After erase the device's key counter resets to 01110000. + # If the device's current max key is below our high-water mark + # we know erase happened. This catches the cleanest case but + # does NOT catch erase-then-record-many-events (where the new + # max may climb past the old max). The (key, timestamp) check + # in get_events() is what handles those. if device_keys and max_seen_key != "00000000": - max_device_key = max(device_keys) # lexicographic; safe because - # keys share the same 4-char prefix + max_device_key = max(device_keys) if max_device_key < max_seen_key: log.info( " Post-erase reset detected: " "device max key %s < historical max %s " - "-- treating all device keys as new", + "-- discarding stale (key, ts) state for this session", max_device_key, max_seen_key, ) - seen_keys = set() # discard stale dedup info for this session + seen_events = {} - new_key_set = set(device_keys) - seen_keys - log.info(" Device has %d key(s): %d new, %d already seen", - len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) - if not new_key_set: - log.info(" [OK] All events already downloaded -- nothing to do") - # Refresh state timestamp; preserve max_seen_key unchanged. - state[unit_key] = { - "downloaded_keys": sorted(seen_keys | set(device_keys)), - "max_downloaded_key": max_seen_key, - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - - # ── Erase even when no new events (if requested) ────────── - # Blastware ACH always erases after every session — even when - # nothing new was downloaded. Without the erase the device - # still sees stored events in its memory and immediately - # retries the call-home, causing the looping we observed. - # Only erase when device actually has events stored; skip - # the erase if device_keys is empty (nothing to erase). - if self.clear_after_download and device_keys: - log.info( - " Clearing device memory (--clear-after-download, " - "no new events but device has %d stored)...", - len(device_keys), - ) - try: - client.delete_all_events() - log.info(" [OK] Device memory cleared") - # Reset state so the next session starts fresh. - state[unit_key] = { - "downloaded_keys": [], - "max_downloaded_key": "00000000", - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - except Exception as exc: - log.error( - " [WARN] Event deletion failed: %s -- events NOT cleared", - exc, - ) - - log.info("Session complete (no new events) -> %s", session_dir) - return - else: - new_key_set = None # unknown; proceed with full download + # Note: no early-exit "all already downloaded" short-circuit + # here. Without per-event timestamps we cannot tell whether + # device_keys ⊆ seen_events.keys() actually means we have + # those physical events. get_events() will read 0C on its + # skip path and decide per event. # Apply max_events cap # stop_idx: when we know the count from list_event_keys, use it as @@ -391,24 +393,35 @@ class AchSession: ) try: + # Pass `seen_events` (key → ISO timestamp) so the client can + # read 0C on its skip path and only skip 5A when the per-event + # timestamp matches what we already have on disk. When force_- + # redownload is set, seen_events was already cleared above. + # + # Filter out empty-string timestamps (legacy v1 entries) — the + # client's 0C-on-skip-path only trusts entries with a + # populated timestamp; otherwise it falls through to a full + # 5A download. + skip_dict = {k: ts for k, ts in seen_events.items() if ts} + all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, - skip_waveform_for_keys=seen_keys if seen_keys else None, + skip_waveform_for_events=skip_dict if skip_dict else None, ) - # Filter to events whose keys we haven't saved before. + # New events are those that came back with _a5_frames populated + # (= 5A actually ran on this session). Skipped events have + # _a5_frames = None because the client matched (key, timestamp) + # against skip_dict and bypassed 5A. new_events = [ e for e in all_events - if e._waveform_key is None - or e._waveform_key.hex() not in seen_keys + if getattr(e, "_a5_frames", None) ] skipped = len(all_events) - len(new_events) - log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", + log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)", len(all_events), len(new_events), skipped) - if skipped: - log.info(" (skipped %d already-downloaded event(s))", skipped) # ── Persist event file + A5 sidecar to the waveform store ── # Saves ride alongside the existing JSON dump so the on-disk @@ -537,35 +550,64 @@ class AchSession: ) # ── Update persistent state ─────────────────────────────────── - # Include both triggered-event keys and monitor-log keys in the - # downloaded set so they are not re-processed on the next call-home. - current_event_keys = [ - e._waveform_key.hex() - for e in all_events - if e._waveform_key is not None - ] - current_monitor_keys = [e.key for e in new_monitor_entries] - current_keys = current_event_keys + current_monitor_keys + # Build a fresh (key → ISO timestamp) map from THIS session's + # results. For each event currently on the device, prefer the + # timestamp we just observed (from 0C); fall back to whatever + # was already in seen_events for that key (so we don't lose an + # entry just because get_events skipped it on the (key, ts) + # match path). + def _ts_iso(ev) -> str: + ts = getattr(ev, "timestamp", None) + if ts is None: + return "" + try: + return datetime.datetime( + ts.year, ts.month, ts.day, + ts.hour or 0, ts.minute or 0, ts.second or 0, + ).isoformat() + except Exception: + return str(ts) + + current_events_map: dict[str, str] = {} + for ev in all_events: + if ev._waveform_key is None: + continue + key_hex = ev._waveform_key.hex() + ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "") + current_events_map[key_hex] = ts_iso + + # Monitor-log entries don't have a 0C-style timestamp, but + # they DO have a start_time; use that so the monitor-log keys + # are properly entered into the (key, ts) map. + for ml in new_monitor_entries: + key_hex = ml.key + ts = ml.start_time + ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "") + # If a triggered event already populated this key, keep + # whichever has a non-empty timestamp. + if key_hex not in current_events_map or not current_events_map[key_hex]: + current_events_map[key_hex] = ts_iso if erased_successfully: - # Device memory is clear. Reset downloaded_keys and the - # high-water mark so the next call-home starts fresh and - # doesn't mis-identify the recycled key 01110000 as "seen". - updated_keys = [] + updated_events: dict[str, str] = {} new_max_key = "00000000" log.info( " State reset after erase -- next session will download " "from key 0 (device counter resets after erase)" ) else: - # Normal (no erase): union of previously-seen + all keys on - # device now. Includes already-seen survivors so we never - # re-download them if the device somehow keeps old records. - updated_keys = sorted(set(seen_keys) | set(current_keys)) - new_max_key = updated_keys[-1] if updated_keys else max_seen_key + # Merge: keep prior (key, ts) entries we still have evidence + # of (for survivors of any partial failure), plus this + # session's authoritative (key, ts) pairs. + updated_events = dict(seen_events) + updated_events.update(current_events_map) + new_max_key = ( + max(updated_events.keys()) + if updated_events else max_seen_key + ) state[unit_key] = { - "downloaded_keys": updated_keys, + "downloaded_events": updated_events, "max_downloaded_key": new_max_key, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, @@ -704,6 +746,7 @@ def serve(args: argparse.Namespace) -> None: print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") + print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") @@ -744,6 +787,7 @@ def serve(args: argparse.Namespace) -> None: store=store, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, + force_redownload=args.force_redownload_all, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() @@ -828,6 +872,17 @@ def parse_args() -> argparse.Namespace: "This mirrors the standard Blastware ACH workflow." ), ) + p.add_argument( + "--force-redownload-all", + action="store_true", + default=False, + help=( + "Manual override: ignore ach_state.json's downloaded_events map " + "for this session and re-download every event currently on the " + "device, regardless of (key, timestamp) match. Useful when state " + "has become inconsistent with the on-disk waveform store / DB." + ), + ) p.add_argument( "--verbose", "-v", action="store_true", diff --git a/minimateplus/client.py b/minimateplus/client.py index 7b1f9eb..cddabbe 100644 --- a/minimateplus/client.py +++ b/minimateplus/client.py @@ -449,7 +449,7 @@ class MiniMateClient: proto.confirm_erase_all() log.info("delete_all_events: erase confirmed — device memory cleared") - def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None, extra_chunks_after_metadata: int = 1) -> list[Event]: + def get_events(self, full_waveform: bool = False, debug: bool = False, stop_after_index: Optional[int] = None, skip_waveform_for_keys: Optional[set] = None, skip_waveform_for_events: Optional[dict] = None, extra_chunks_after_metadata: int = 1) -> list[Event]: """ Download all stored events from the device using the confirmed 1E → 0A → 0C → 5A → 1F event-iterator protocol. @@ -497,37 +497,24 @@ class MiniMateClient: events: list[Event] = [] idx = 0 + # Legacy bare-key skip set is deprecated: the device's key counter + # resets to 0x01110000 after every memory erase, so a key in this set + # cannot be trusted to identify the same physical event across erases. + # If a caller still passes it, log a warning and ignore — full + # downloads will run for every event so the bug never silently bites. + if skip_waveform_for_keys: + log.warning( + "get_events: skip_waveform_for_keys is deprecated and unsafe " + "(post-erase key reuse); ignoring %d entries. Use " + "skip_waveform_for_events={key: timestamp_iso} instead.", + len(skip_waveform_for_keys), + ) + skip_evts: dict[str, str] = dict(skip_waveform_for_events or {}) + while data8[4:8] != b"\x00\x00\x00\x00": cur_key = key4 # key for this event's 0A/1E-arm/0C/5A calls log.info("get_events: record %d key=%s", idx, cur_key.hex()) - # Fast-advance path: if this key is already downloaded, skip - # 1E-arm/0C/POLL/5A entirely. Only 0A + 1F(browse) are needed - # to advance the device's internal pointer to the next event. - # This is identical to the browse-mode walk in count_events(). - if skip_waveform_for_keys and cur_key.hex() in skip_waveform_for_keys: - log.debug("get_events: key=%s already seen -- fast-advance only", cur_key.hex()) - try: - proto.read_waveform_header(cur_key) - except ProtocolError as exc: - log.warning( - "get_events: 0A failed for key=%s (skip path): %s -- stopping", - cur_key.hex(), exc, - ) - break - try: - key4, data8 = proto.advance_event(browse=True) - except ProtocolError as exc: - log.warning( - "get_events: 1F failed for key=%s (skip path): %s -- stopping", - cur_key.hex(), exc, - ) - break - idx += 1 - if stop_after_index is not None and idx > stop_after_index: - break - continue - ev = Event(index=idx) ev._waveform_key = cur_key @@ -574,72 +561,96 @@ class MiniMateClient: "get_events: 0C failed for key=%s: %s", cur_key.hex(), exc ) - # SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the - # device's bulk stream state machine. Cache the returned key as a - # fallback for loop iteration when 5A fails (see iteration block below). - # Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL). - arm_key4: Optional[bytes] = None - try: - arm_key4, _ = proto.advance_event(browse=False) # arm 5A - log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex()) - except ProtocolError as exc: - log.warning("get_events: 1F(download) arm failed: %s", exc) + # ── Skip-5A decision based on (key, timestamp) match ────── + # If skip_waveform_for_events maps cur_key.hex() to a non-empty + # ISO timestamp matching what we just read from 0C, this is + # the same physical event we already have on disk — bypass + # the 1F(arm)+POLL+5A bulk download. Otherwise (no entry, or + # timestamp mismatch indicating post-erase reuse) fall through + # to the full download. + expected_ts = skip_evts.get(cur_key.hex(), "") + actual_ts = _event_timestamp_iso(ev) + skip_5a = bool(expected_ts and actual_ts and expected_ts == actual_ts) + if skip_5a: + log.info( + "get_events: key=%s (key, ts=%s) match — skipping 5A bulk download", + cur_key.hex(), actual_ts, + ) - # POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A. - # Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74). - log.info("get_events: POLL × 3 before 5A") - for _p in range(3): + arm_key4: Optional[bytes] = None + a5_ok = False + + if not skip_5a: + # SUB 1F (download-arm) — send token=0xFE BEFORE POLL+5A to arm the + # device's bulk stream state machine. Cache the returned key as a + # fallback for loop iteration when 5A fails (see iteration block below). + # Confirmed from 4-2-26 capture frames 66-67 (1F before frames 68-73 POLL). try: - proto.poll() + arm_key4, _ = proto.advance_event(browse=False) # arm 5A + log.info("get_events: 1F(download) — 5A armed, arm_key=%s", arm_key4.hex()) except ProtocolError as exc: - log.warning("get_events: POLL %d failed: %s", _p, exc) + log.warning("get_events: 1F(download) arm failed: %s", exc) + + # POLL × 3 — BW sends 3 full POLL cycles between 1F and 5A. + # Confirmed from 4-2-26 BW TX capture (frames 68-73 before 5A at 74). + log.info("get_events: POLL × 3 before 5A") + for _p in range(3): + try: + proto.poll() + except ProtocolError as exc: + log.warning("get_events: POLL %d failed: %s", _p, exc) # SUB 5A — bulk waveform stream (uses cur_key, the event set up by 0A+1E+0C). # By default (full_waveform=False): stop after frame 7 for metadata only. # When full_waveform=True: fetch all chunks and decode raw ADC samples. - a5_ok = False - try: - if full_waveform: - log.info( - "get_events: 5A full waveform download for key=%s", cur_key.hex() - ) - a5_frames = proto.read_bulk_waveform_stream( - cur_key, stop_after_metadata=False, max_chunks=128, - include_terminator=True, - ) - if a5_frames: - a5_ok = True - ev._a5_frames = a5_frames # store for write_blastware_file - _decode_a5_metadata_into(a5_frames, ev) - _decode_a5_waveform(a5_frames, ev) + # + # Bypassed when skip_5a is True — the event is left with + # _a5_frames=None, which signals to the caller (e.g. + # ach_server.py) that this event was matched by (key, ts) and + # already has a stored .file in the persistent waveform store. + if not skip_5a: + try: + if full_waveform: log.info( - "get_events: 5A decoded %d sample-sets", - len((ev.raw_samples or {}).get("Tran", [])), + "get_events: 5A full waveform download for key=%s", cur_key.hex() ) - else: - log.info( - "get_events: 5A metadata-only download for key=%s", cur_key.hex() - ) - a5_frames = proto.read_bulk_waveform_stream( - cur_key, stop_after_metadata=True, - include_terminator=True, - extra_chunks_after_metadata=extra_chunks_after_metadata, - max_chunks=128, - ) - if a5_frames: - a5_ok = True - ev._a5_frames = a5_frames # store for write_blastware_file - _decode_a5_metadata_into(a5_frames, ev) - log.debug( - "get_events: 5A metadata client=%r operator=%r", - ev.project_info.client if ev.project_info else None, - ev.project_info.operator if ev.project_info else None, + a5_frames = proto.read_bulk_waveform_stream( + cur_key, stop_after_metadata=False, max_chunks=128, + include_terminator=True, ) - except ProtocolError as exc: - log.warning( - "get_events: 5A failed for key=%s: %s — metadata unavailable", - cur_key.hex(), exc, - ) + if a5_frames: + a5_ok = True + ev._a5_frames = a5_frames # store for write_blastware_file + _decode_a5_metadata_into(a5_frames, ev) + _decode_a5_waveform(a5_frames, ev) + log.info( + "get_events: 5A decoded %d sample-sets", + len((ev.raw_samples or {}).get("Tran", [])), + ) + else: + log.info( + "get_events: 5A metadata-only download for key=%s", cur_key.hex() + ) + a5_frames = proto.read_bulk_waveform_stream( + cur_key, stop_after_metadata=True, + include_terminator=True, + extra_chunks_after_metadata=extra_chunks_after_metadata, + max_chunks=128, + ) + if a5_frames: + a5_ok = True + ev._a5_frames = a5_frames # store for write_blastware_file + _decode_a5_metadata_into(a5_frames, ev) + log.debug( + "get_events: 5A metadata client=%r operator=%r", + ev.project_info.client if ev.project_info else None, + ev.project_info.operator if ev.project_info else None, + ) + except ProtocolError as exc: + log.warning( + "get_events: 5A failed for key=%s: %s — metadata unavailable", + cur_key.hex(), exc, + ) # SUB 1F — loop iteration. # @@ -652,7 +663,14 @@ class MiniMateClient: # Confirmed from 4-3-26 browse-mode captures: browse=True params # are correct for multi-event iteration. Conditional logic added # 2026-04-06 to avoid post-failure state disruption. - if a5_ok: + # + # NEW 2026-05-06: when skip_5a=True we never entered the 5A + # state at all (we read 0A+1E(arm)+0C and chose to bypass). + # 1F(browse) is safe in this scenario — the device's iteration + # pointer is independent of the bulk-stream state machine, and + # we never put it into the half-attempted 5A state that the + # earlier "post-failure 1F disruption" warning is about. + if skip_5a or a5_ok: # 5A succeeded — use browse 1F for reliable key advancement. try: key4, data8 = proto.advance_event(browse=True) @@ -1174,6 +1192,27 @@ class MiniMateClient: # Pure functions: bytes → model field population. # Kept here (not in models.py) to isolate protocol knowledge from data shapes. +def _event_timestamp_iso(event: Event) -> str: + """ + Return a stable ISO-8601 string for the event's 0C-derived timestamp, + or "" if the event has no timestamp populated. + + The format intentionally matches what `bridges/ach_server.py` writes + into `ach_state.json:downloaded_events[*]` so the (key, ts) compare + in get_events()'s skip path is a simple string equality. + """ + ts = getattr(event, "timestamp", None) + if ts is None: + return "" + try: + return datetime.datetime( + ts.year, ts.month, ts.day, + ts.hour or 0, ts.minute or 0, ts.second or 0, + ).isoformat() + except Exception: + return str(ts) + + def _decode_serial_number(data: bytes) -> DeviceInfo: """ Decode SUB EA (SERIAL_NUMBER_RESPONSE) payload into a new DeviceInfo. diff --git a/sfm/cache.py b/sfm/cache.py index be35e60..5505bc8 100644 --- a/sfm/cache.py +++ b/sfm/cache.py @@ -83,13 +83,24 @@ class CachedEvent(Base): Events are immutable once recorded on the device; once we have an event in the cache it never needs to be re-downloaded unless explicitly requested. + + The two extra columns `waveform_key` and `event_timestamp` are an + integrity stamp: when set_event() / set_waveform() are called with a + different (waveform_key, event_timestamp) for the same (conn_key, index), + we know the device was erased and re-recorded — the cached row no longer + refers to the same physical event and the entire device's cache is + flushed before the new entry is written. This catches the post-erase + key-reuse bug where the device's first new event (key 01110000) collides + with the first event we previously downloaded. """ __tablename__ = "cached_events" - conn_key = sa.Column(sa.String, primary_key=True) - index = sa.Column(sa.Integer, primary_key=True) - event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict - cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + event_json = sa.Column(sa.Text, nullable=False) # serialised Event dict + cached_at = sa.Column(sa.Float, nullable=False) # Unix timestamp + waveform_key = sa.Column(sa.String, nullable=True) # 8-hex device key + event_timestamp = sa.Column(sa.String, nullable=True) # ISO-8601 from 0C class CachedWaveform(Base): @@ -97,14 +108,18 @@ class CachedWaveform(Base): Full raw ADC waveform for a single event (SUB 5A full download). These are large (up to several MB) and expensive to fetch over cellular. - Once downloaded they are immutable and cached permanently. + Once downloaded they are immutable and cached permanently — but the + cache row is invalidated when the device is erased and a new event lands + at the same index (see CachedEvent docstring). """ __tablename__ = "cached_waveforms" - conn_key = sa.Column(sa.String, primary_key=True) - index = sa.Column(sa.Integer, primary_key=True) - waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON - cached_at = sa.Column(sa.Float, nullable=False) + conn_key = sa.Column(sa.String, primary_key=True) + index = sa.Column(sa.Integer, primary_key=True) + waveform_json = sa.Column(sa.Text, nullable=False) # full /device/event/{idx}/waveform response JSON + cached_at = sa.Column(sa.Float, nullable=False) + waveform_key = sa.Column(sa.String, nullable=True) # 8-hex device key + event_timestamp = sa.Column(sa.String, nullable=True) # ISO-8601 from 0C class CachedMonitorStatus(Base): @@ -149,6 +164,23 @@ class SFMCache: engine = sa.create_engine(url, connect_args={"check_same_thread": False}) Base.metadata.create_all(engine) self._Session = orm.sessionmaker(bind=engine) + # In-place schema migration: add the (waveform_key, event_timestamp) + # integrity-stamp columns to legacy cache DBs that predate the + # post-erase eviction logic. ALTER TABLE ADD COLUMN is idempotent + # via the column-presence check below. + with engine.begin() as conn: + for table in ("cached_events", "cached_waveforms"): + cols = { + r[1] + for r in conn.exec_driver_sql(f"PRAGMA table_info({table})").fetchall() + } + for new_col, ddl in ( + ("waveform_key", "TEXT"), + ("event_timestamp", "TEXT"), + ): + if new_col not in cols: + log.info("cache schema: %s ADD COLUMN %s %s", table, new_col, ddl) + conn.exec_driver_sql(f"ALTER TABLE {table} ADD COLUMN {new_col} {ddl}") log.info("SFM cache opened: %s", db_path) # ── Connection key ──────────────────────────────────────────────────────── @@ -242,15 +274,91 @@ class SFMCache: row = s.get(CachedEvent, (conn_key, index)) return json.loads(row.event_json) if row else None + @staticmethod + def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]: + """ + Extract the (waveform_key_hex, timestamp_iso) integrity stamp from + a serialised event dict. Either field may be None if the source + Event was missing it; the comparison logic in set_events/set_waveform + treats "both sides have a value AND they differ" as the only + eviction trigger, so partial data never spuriously flushes cache. + """ + key = ev.get("waveform_key") or ev.get("_waveform_key") + if isinstance(key, (bytes, bytearray)): + key = bytes(key).hex() + ts = ev.get("timestamp") + if isinstance(ts, dict): + # _serialise_timestamp returns a dict like {"iso": "...", ...} + ts = ts.get("iso") or ts.get("string") or None + return (key if isinstance(key, str) else None, + ts if isinstance(ts, str) else None) + + def _maybe_flush_on_mismatch( + self, + s, + conn_key: str, + index: int, + new_key: Optional[str], + new_ts: Optional[str], + ) -> bool: + """ + Check whether the cached entry at (conn_key, index) has a different + (waveform_key, timestamp) than the incoming one. If so, treat it as + a post-erase key-reuse signal and flush ALL cached events/waveforms + for this device, then return True. + Returns False when no flush was needed. + """ + if not new_key and not new_ts: + return False # nothing to compare against + existing = s.get(CachedEvent, (conn_key, index)) + if existing is None: + existing = s.get(CachedWaveform, (conn_key, index)) + if existing is None: + return False + old_key = existing.waveform_key + old_ts = existing.event_timestamp + # Only flush when both sides have populated values and they differ. + differs = ( + (new_key and old_key and new_key != old_key) + or (new_ts and old_ts and new_ts != old_ts) + ) + if not differs: + return False + log.warning( + "cache: device %s — index %d (key=%s, ts=%s) replaces (key=%s, ts=%s); " + "flushing all cached events/waveforms for this device " + "(post-erase key reuse detected)", + conn_key, index, new_key, new_ts, old_key, old_ts, + ) + s.query(CachedEvent).filter_by(conn_key=conn_key).delete() + s.query(CachedWaveform).filter_by(conn_key=conn_key).delete() + return True + def set_events(self, conn_key: str, events: list[dict]) -> None: """ Upsert a list of event dicts. Existing rows are updated; new rows are inserted. This is used to add newly-discovered events to the cache. + + Eviction: if any incoming event has a different (waveform_key, + timestamp) than the row currently cached at the same index, we flush + the entire device's cache before inserting the new entries. Catches + post-erase key reuse where index 0 silently switches identity. """ now = time.time() with self._Session() as s: + # Eviction check: scan incoming events for any (index, key, ts) + # that conflicts with a cached row. A single conflict triggers + # a full device-wide flush so we don't end up with a mixed-era + # cache. + for ev in events: + key, ts = self._event_signature(ev) + if self._maybe_flush_on_mismatch(s, conn_key, ev["index"], key, ts): + s.commit() + break # cache is now empty for this device; carry on + for ev in events: idx = ev["index"] + key, ts = self._event_signature(ev) row = s.get(CachedEvent, (conn_key, idx)) if row is None: row = CachedEvent( @@ -258,12 +366,18 @@ class SFMCache: index=idx, event_json=json.dumps(ev), cached_at=now, + waveform_key=key, + event_timestamp=ts, ) s.add(row) log.debug("cached new event %d for %s", idx, conn_key) else: # Refresh in case project_info was backfilled after initial store row.event_json = json.dumps(ev) + if key: + row.waveform_key = key + if ts: + row.event_timestamp = ts s.commit() # ── Waveforms ───────────────────────────────────────────────────────────── @@ -278,8 +392,16 @@ class SFMCache: return json.loads(row.waveform_json) def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: - """Store a full waveform response dict permanently.""" + """ + Store a full waveform response dict permanently. + + Like set_events, this checks the (waveform_key, timestamp) signature + of the incoming entry against what's currently cached at the same + index. A mismatch flushes the entire device's cache before insert. + """ + key, ts = self._event_signature(waveform) with self._Session() as s: + self._maybe_flush_on_mismatch(s, conn_key, index, key, ts) row = s.get(CachedWaveform, (conn_key, index)) if row is None: row = CachedWaveform( @@ -287,13 +409,20 @@ class SFMCache: index=index, waveform_json=json.dumps(waveform), cached_at=time.time(), + waveform_key=key, + event_timestamp=ts, ) s.add(row) else: row.waveform_json = json.dumps(waveform) row.cached_at = time.time() + if key: + row.waveform_key = key + if ts: + row.event_timestamp = ts s.commit() - log.debug("cached waveform for %s event %d", conn_key, index) + log.debug("cached waveform for %s event %d (key=%s, ts=%s)", + conn_key, index, key, ts) # ── Monitor status ──────────────────────────────────────────────────────── diff --git a/sfm/live_cache.py b/sfm/live_cache.py new file mode 100644 index 0000000..9c7cf10 --- /dev/null +++ b/sfm/live_cache.py @@ -0,0 +1,189 @@ +""" +sfm/live_cache.py — Thread-safe in-memory cache for live SFM device data. + +Extracted from sfm/server.py so the cache logic is importable and testable +without pulling in fastapi/uvicorn. + +Caching strategy +---------------- +Keyed by `conn_key` ("tcp:host:port" or "serial:port:baud"). Does NOT +persist across server restarts. + + device_info cached until POST /device/config marks it dirty + events cached by (conn_key, device_event_count); re-fetched when + a quick count_events() probe shows new events on the device + monitor_status 30-second TTL (changes frequently during monitoring) + waveforms permanent within a process — but auto-evicted at the device + level when a (waveform_key, timestamp) mismatch is detected + at the same index (post-erase key reuse — the device's + event-key counter resets to 0x01110000 after every erase, + so the same `(conn_key, index)` slot can refer to a + brand-new physical event). + +All endpoints accept ?force=true to bypass the cache and re-read. +""" + +from __future__ import annotations + +import threading +import time +from typing import Optional + +_MONITOR_STATUS_TTL = 30.0 # seconds + + +class LiveCache: + """ + Thread-safe in-memory cache for live SFM device data. + One singleton per server process. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._device_info: dict[str, dict] = {} + self._events: dict[str, tuple[int, list]] = {} + self._monitor_status: dict[str, tuple[float, dict]] = {} + self._config_dirty: dict[str, bool] = {} + self._waveforms: dict[tuple, dict] = {} + + # ── Connection key ──────────────────────────────────────────────────────── + + @staticmethod + def make_conn_key( + host: Optional[str], + tcp_port: int, + port: Optional[str], + baud: int, + ) -> str: + if host: + return f"tcp:{host}:{tcp_port}" + return f"serial:{port}:{baud}" + + # ── Eviction signature ──────────────────────────────────────────────────── + + @staticmethod + def _event_signature(ev: dict) -> tuple[Optional[str], Optional[str]]: + """Return (waveform_key_hex, timestamp_iso) from a serialised event.""" + key = ev.get("waveform_key") or ev.get("_waveform_key") + if isinstance(key, (bytes, bytearray)): + key = bytes(key).hex() + ts = ev.get("timestamp") + if isinstance(ts, dict): + ts = ts.get("iso") or ts.get("string") or None + return (key if isinstance(key, str) else None, + ts if isinstance(ts, str) else None) + + def _flush_device(self, conn_key: str) -> None: + """Drop all cached events + waveforms for one device. Caller holds lock.""" + self._events.pop(conn_key, None) + stale_wf_keys = [k for k in self._waveforms if k[0] == conn_key] + for k in stale_wf_keys: + self._waveforms.pop(k, None) + + # ── Device info ─────────────────────────────────────────────────────────── + + def get_device_info(self, conn_key: str) -> Optional[dict]: + with self._lock: + if self._config_dirty.get(conn_key): + return None + return self._device_info.get(conn_key) + + def set_device_info(self, conn_key: str, info: dict) -> None: + with self._lock: + self._device_info[conn_key] = info + self._config_dirty[conn_key] = False + + # ── Events ──────────────────────────────────────────────────────────────── + + def get_events(self, conn_key: str, device_count: int) -> Optional[list]: + with self._lock: + if self._config_dirty.get(conn_key): + return None + entry = self._events.get(conn_key) + if entry is None: + return None + cached_count, events = entry + return events if cached_count == device_count else None + + def set_events(self, conn_key: str, device_count: int, events: list) -> None: + """ + Replace the cached events list for `conn_key`. If any incoming event + has a different (waveform_key, timestamp) than the cached entry at + the same index, flush the entire conn_key's event + waveform cache + first. Catches post-erase key reuse. + """ + with self._lock: + cached_entry = self._events.get(conn_key) + cached_events = cached_entry[1] if cached_entry else [] + cached_by_index = {e.get("index"): e for e in cached_events} + + evict = False + for ev in events: + idx = ev.get("index") + if idx is None: + continue + cached = cached_by_index.get(idx) + if cached is None: + continue + new_key, new_ts = self._event_signature(ev) + old_key, old_ts = self._event_signature(cached) + if (new_key and old_key and new_key != old_key) or \ + (new_ts and old_ts and new_ts != old_ts): + evict = True + break + + if evict: + self._flush_device(conn_key) + + self._events[conn_key] = (device_count, events) + + # ── Monitor status ──────────────────────────────────────────────────────── + + def get_monitor_status(self, conn_key: str) -> Optional[dict]: + with self._lock: + entry = self._monitor_status.get(conn_key) + if entry is None: + return None + fetched_at, status = entry + if time.time() - fetched_at > _MONITOR_STATUS_TTL: + return None + return status + + def set_monitor_status(self, conn_key: str, status: dict) -> None: + with self._lock: + self._monitor_status[conn_key] = (time.time(), status) + + def invalidate_monitor_status(self, conn_key: str) -> None: + with self._lock: + self._monitor_status.pop(conn_key, None) + + # ── Config dirty flag ───────────────────────────────────────────────────── + + def mark_config_dirty(self, conn_key: str) -> None: + with self._lock: + self._config_dirty[conn_key] = True + self._events.pop(conn_key, None) + + # ── Waveforms (permanent cache, evicted on (key,ts) mismatch) ───────────── + + def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: + with self._lock: + return self._waveforms.get((conn_key, index)) + + def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: + """ + Cache a waveform. Evicts the device's whole cache when the existing + entry at the same index has a different (waveform_key, timestamp). + """ + with self._lock: + existing = self._waveforms.get((conn_key, index)) + if existing is not None: + new_key, new_ts = self._event_signature(waveform) + old_key, old_ts = self._event_signature(existing) + differs = ( + (new_key and old_key and new_key != old_key) + or (new_ts and old_ts and new_ts != old_ts) + ) + if differs: + self._flush_device(conn_key) + self._waveforms[(conn_key, index)] = waveform diff --git a/sfm/server.py b/sfm/server.py index 6bb9e37..f6043d9 100644 --- a/sfm/server.py +++ b/sfm/server.py @@ -66,6 +66,7 @@ from minimateplus.blastware_file import write_blastware_file, blastware_filename from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform from sfm.cache import SFMCache, get_cache from sfm.database import SeismoDb +from sfm.live_cache import LiveCache as _LiveCache from sfm.waveform_store import WaveformStore logging.basicConfig( @@ -142,116 +143,6 @@ def _get_store() -> WaveformStore: # # All endpoints accept ?force=true to bypass the cache and re-read from device. -_MONITOR_STATUS_TTL = 30.0 # seconds - - -class _LiveCache: - """ - Thread-safe in-memory cache for live SFM device data. - One singleton per server process. - """ - - def __init__(self) -> None: - self._lock = threading.Lock() - # conn_key → serialised device info dict - self._device_info: dict[str, dict] = {} - # conn_key → (device_event_count_when_cached, [event dicts]) - self._events: dict[str, tuple[int, list]] = {} - # conn_key → (fetched_at_unix, status_dict) - self._monitor_status: dict[str, tuple[float, dict]] = {} - # conn_key → bool (True = re-read device on next /device/info) - self._config_dirty: dict[str, bool] = {} - # (conn_key, event_index) → waveform dict (permanent) - self._waveforms: dict[tuple, dict] = {} - - # ── Connection key ──────────────────────────────────────────────────────── - - @staticmethod - def make_conn_key( - host: Optional[str], - tcp_port: int, - port: Optional[str], - baud: int, - ) -> str: - if host: - return f"tcp:{host}:{tcp_port}" - return f"serial:{port}:{baud}" - - # ── Device info ─────────────────────────────────────────────────────────── - - def get_device_info(self, conn_key: str) -> Optional[dict]: - with self._lock: - if self._config_dirty.get(conn_key): - return None - return self._device_info.get(conn_key) - - def set_device_info(self, conn_key: str, info: dict) -> None: - with self._lock: - self._device_info[conn_key] = info - self._config_dirty[conn_key] = False - - # ── Events ──────────────────────────────────────────────────────────────── - - def get_events(self, conn_key: str, device_count: int) -> Optional[list]: - """ - Return cached events if the device's current event count matches what - we had when we last fetched. Returns None (cache miss) otherwise. - """ - with self._lock: - if self._config_dirty.get(conn_key): - return None - entry = self._events.get(conn_key) - if entry is None: - return None - cached_count, events = entry - return events if cached_count == device_count else None - - def set_events(self, conn_key: str, device_count: int, events: list) -> None: - with self._lock: - self._events[conn_key] = (device_count, events) - - # ── Monitor status ──────────────────────────────────────────────────────── - - def get_monitor_status(self, conn_key: str) -> Optional[dict]: - with self._lock: - entry = self._monitor_status.get(conn_key) - if entry is None: - return None - fetched_at, status = entry - if time.time() - fetched_at > _MONITOR_STATUS_TTL: - return None - return status - - def set_monitor_status(self, conn_key: str, status: dict) -> None: - with self._lock: - self._monitor_status[conn_key] = (time.time(), status) - - def invalidate_monitor_status(self, conn_key: str) -> None: - with self._lock: - self._monitor_status.pop(conn_key, None) - - # ── Config dirty flag ───────────────────────────────────────────────────── - - def mark_config_dirty(self, conn_key: str) -> None: - """ - Called after a successful POST /device/config write. - Forces next /device/info and /device/events to re-read from the device. - """ - with self._lock: - self._config_dirty[conn_key] = True - self._events.pop(conn_key, None) - - # ── Waveforms (permanent cache) ─────────────────────────────────────────── - - def get_waveform(self, conn_key: str, index: int) -> Optional[dict]: - with self._lock: - return self._waveforms.get((conn_key, index)) - - def set_waveform(self, conn_key: str, index: int, waveform: dict) -> None: - with self._lock: - self._waveforms[(conn_key, index)] = waveform - - _live_cache = _LiveCache() @@ -872,6 +763,7 @@ def device_event_blastware_file( baud: int = Query(38400, description="Serial baud rate"), host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"), tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"), + force: bool = Query(False, description="Bypass any cached/dedup'd state and re-download from device"), ) -> FileResponse: """ Download the waveform for a single event (0-based index) and return it @@ -893,9 +785,13 @@ def device_event_blastware_file( stop_after_index=index) → write_blastware_file() → FileResponse. """ log.info( - "GET /device/event/%d/blastware_file port=%s host=%s", - index, port, host, + "GET /device/event/%d/blastware_file port=%s host=%s force=%s", + index, port, host, force, ) + # `force` always re-downloads from the device. This endpoint already + # never short-circuits via cache, so `force` is reserved for parity with + # the other live endpoints and to suppress the post-download persist + # (see end of handler) when the caller wants a fetch-only escape hatch. try: def _do(): diff --git a/sfm/sfm_webapp.html b/sfm/sfm_webapp.html index a763b39..63f15b8 100644 --- a/sfm/sfm_webapp.html +++ b/sfm/sfm_webapp.html @@ -609,6 +609,36 @@ .section-btn:hover { color: var(--text); } .section-btn.active { background: var(--blue); color: #fff; } + /* ── Force-refresh toggle ── */ + .force-toggle { + display: flex; + align-items: center; + gap: 6px; + padding: 4px 10px; + border: 1px solid var(--border); + border-radius: 6px; + background: var(--bg); + cursor: pointer; + font-size: 11px; + font-weight: 600; + color: var(--text-dim); + user-select: none; + white-space: nowrap; + transition: background 0.12s, color 0.12s, border-color 0.12s; + } + .force-toggle input { margin: 0; cursor: pointer; } + .force-toggle:hover { color: var(--text); } + .force-toggle.active { + background: rgba(248, 81, 73, 0.18); + border-color: #f85149; + color: #ff7b72; + } + .force-toggle .ft-dot { + width: 6px; height: 6px; border-radius: 50%; + background: var(--text-mute); + } + .force-toggle.active .ft-dot { background: #f85149; box-shadow: 0 0 6px #f85149; } + /* ── Section containers ── */ #section-live, #section-db { display: flex; @@ -654,6 +684,13 @@ +
+