From c7e7d177e6f422620b6a599b732a46270dcfcc96 Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Sat, 11 Apr 2026 01:14:50 -0400 Subject: [PATCH] feat: overhaul ACH server with key-based state, erase support, and reset detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit State format (ach_state.json): - Replace event_count with downloaded_keys (set of hex strings) + max_downloaded_key - Key-based tracking correctly handles delete-then-re-record: after device erase the count drops to 0, but new events have new (or recycled) keys Browse pre-check: - list_event_keys() walk before get_events() to bail early when nothing is new - get_events() called with skip_waveform_for_keys= for already-seen keys, so repeat call-homes only download waveforms for genuinely new events --clear-after-download flag: - After saving new events, calls client.delete_all_events() (0xA3→0x1C→0x06→0xA2) - On success: resets downloaded_keys=[] and max_downloaded_key="00000000" so the next session starts fresh (device counter resets to 0x01110000 after erase) Post-erase key-reuse detection: - Device counter resets to 0x01110000 after any erase; new events reuse old keys - If max(device_keys) < max_downloaded_key, the device was wiped externally (Blastware, manual) — seen_keys is discarded and all device keys treated as new Co-Authored-By: Claude Sonnet 4.6 --- bridges/ach_server.py | 231 +++++++++++++++++++++++++++++++++--------- 1 file changed, 181 insertions(+), 50 deletions(-) diff --git a/bridges/ach_server.py b/bridges/ach_server.py index ddbb053..c9e0302 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -71,9 +71,27 @@ from minimateplus.models import DeviceInfo, Event log = logging.getLogger("ach_server") -# ── Per-unit state (high-water mark) ────────────────────────────────────────── +# ── Per-unit state (downloaded-key set) ─────────────────────────────────────── # Persisted as /ach_state.json -# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... } +# Format: +# { +# "BE11529": { +# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk +# "max_downloaded_key": "0111245a", # highest key ever seen +# "last_seen": "2026-04-11T01:04:36" +# } +# } +# +# Key-based deduplication works well within a single "key generation" (between +# erases). After the device memory is erased the event counter resets to +# 0x01110000, so the first new event has the SAME key as the very first event +# we ever downloaded. We detect this situation with max_downloaded_key: +# +# if max(current_device_keys) < max_downloaded_key +# → device was wiped and keys have restarted → treat all device keys as new +# +# After our own erase (--clear-after-download) we also explicitly clear +# downloaded_keys and max_downloaded_key so the next session starts fresh. _state_lock = threading.Lock() @@ -103,10 +121,10 @@ class AchSession: standard connect → get_device_info → get_events sequence. State tracking (ach_state.json in output_dir): - On each successful download we record how many events the unit had. - On the next call-home we compare: if count hasn't grown, there's nothing - new and we close cleanly without downloading. If it has grown, we - download all events up to the new count and save only the new ones. + On each successful download we record the SET of event keys downloaded. + On the next call-home we compare: if all device keys are already in the + set, there's nothing new. If any key is new (including after the device + was wiped and re-recorded), we download and save only those events. """ def __init__( @@ -118,14 +136,16 @@ class AchSession: events_only: bool, max_events: Optional[int], state_path: Path, + clear_after_download: bool = False, ) -> None: - self.sock = sock - self.peer = peer - self.output_dir = output_dir - self.timeout = timeout - self.events_only = events_only - self.max_events = max_events - self.state_path = state_path + self.sock = sock + self.peer = peer + self.output_dir = output_dir + self.timeout = timeout + self.events_only = events_only + self.max_events = max_events + self.state_path = state_path + self.clear_after_download = clear_after_download def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -221,18 +241,22 @@ class AchSession: else: log.info("Step 2/3: skipping device info (--events-only)") - # ── Step 3: check for new events via high-water mark ─────────────── + # ── Step 3: check for new events by comparing key sets ──────────── log.info("Step 3/3: checking for new events") state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial - last_count = state.get(unit_key, {}).get("event_count", 0) + unit_state = state.get(unit_key, {}) + seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) + # Highest event key ever downloaded from this unit (hex string, 8 chars). + # Used to detect post-erase key reuse — see comment block above. + max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") # Use the event count already read from the event index during connect(). # This is fast (no extra round-trips) and confirmed accurate (matches LCD). # Falls back to count_events() only if connect() wasn't called. if device_info is not None: - current_count = device_info.event_count + current_count = device_info.event_count or 0 else: try: current_count = client.count_events() @@ -240,26 +264,70 @@ class AchSession: log.error(" [FAIL] count_events failed: %s", exc) return - log.info(" Unit has %d stored event(s); last downloaded count: %d", - current_count, last_count) + log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", + current_count, len(seen_keys)) - if current_count <= last_count: - log.info(" [OK] No new events since last call-home -- nothing to download") - log.info("Session complete (no new events) -> %s", session_dir) + if current_count == 0: + log.info(" [OK] No events on device -- nothing to download") + log.info("Session complete (no events) -> %s", session_dir) return - new_event_count = current_count - last_count - log.info(" %d new event(s) to download", new_event_count) + # Fast pre-check: walk the event index (browse-mode, no 5A) to get + # the current key list, then bail early if everything is already seen. + # This avoids calling get_events() at all when there's nothing new. + log.info(" Checking device key list (browse walk, no waveform download)...") + try: + device_keys = client.list_event_keys() + except Exception as exc: + log.warning(" list_event_keys failed: %s -- falling back to full download", exc) + device_keys = None - # Download all events up to current_count, apply max_events cap. - # We re-download old events too (get_events always starts from 0), - # but we only SAVE the new ones (the last new_event_count of the list). + if device_keys is not None: + # ── Post-erase detection ────────────────────────────────────── + # After the device memory is erased, new events start from key + # 01110000 again — the same keys we already downloaded. Detect + # this by comparing the device's current highest key against the + # historical maximum. If the device has rolled back below our + # high-water mark, its counter was reset and we must treat all + # its keys as new, regardless of what seen_keys contains. + if device_keys and max_seen_key != "00000000": + max_device_key = max(device_keys) # lexicographic; safe because + # keys share the same 4-char prefix + if max_device_key < max_seen_key: + log.info( + " Post-erase reset detected: " + "device max key %s < historical max %s " + "-- treating all device keys as new", + max_device_key, max_seen_key, + ) + seen_keys = set() # discard stale dedup info for this session + + new_key_set = set(device_keys) - seen_keys + log.info(" Device has %d key(s): %d new, %d already seen", + len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) + if not new_key_set: + log.info(" [OK] All events already downloaded -- nothing to do") + # Refresh state timestamp; preserve max_seen_key unchanged. + state[unit_key] = { + "downloaded_keys": sorted(seen_keys | set(device_keys)), + "max_downloaded_key": max_seen_key, + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, + } + _save_state(self.state_path, state) + log.info("Session complete (no new events) -> %s", session_dir) + return + else: + new_key_set = None # unknown; proceed with full download + + # Apply max_events cap stop_idx = current_count - 1 if self.max_events is not None: stop_idx = min(stop_idx, self.max_events - 1) if self.max_events < current_count: log.warning( - " max_events=%d cap: will download events 0–%d only " + " max_events=%d cap: will download events 0-%d only " "(unit has %d total)", self.max_events, stop_idx, current_count, ) @@ -268,36 +336,86 @@ class AchSession: all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, + skip_waveform_for_keys=seen_keys if seen_keys else None, ) - # Only the events beyond last_count are genuinely new - new_events = all_events[last_count:] - log.info(" [OK] Downloaded %d total event(s), %d new", - len(all_events), len(new_events)) - _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) - if last_count > 0 and len(all_events) > len(new_events): - log.info(" (skipped %d already-seen event(s))", last_count) + # Filter to events whose keys we haven't saved before. + new_events = [ + e for e in all_events + if e._waveform_key is None + or e._waveform_key.hex() not in seen_keys + ] + skipped = len(all_events) - len(new_events) - for i, ev in enumerate(new_events): - pv = ev.peak_values - pi = ev.project_info + log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", + len(all_events), len(new_events), skipped) + if skipped: + log.info(" (skipped %d already-downloaded event(s))", skipped) + + if new_events: + _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) + + for ev in new_events: + pv = ev.peak_values + pi = ev.project_info + key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" + log.info( + " NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", + key_hex, + str(ev.timestamp) if ev.timestamp else "?", + pv.tran if pv else 0, + pv.vert if pv else 0, + pv.long if pv else 0, + pv.peak_vector_sum if pv else 0, + pi.project if pi else "", + ) + else: + log.info(" [OK] No new events since last call-home -- nothing to save") + + # ── Optional: erase device memory after successful download ──── + erased_successfully = False + if self.clear_after_download and new_events: + log.info(" Clearing device memory (--clear-after-download)...") + try: + client.delete_all_events() + log.info(" [OK] Device memory cleared") + erased_successfully = True + except Exception as exc: + log.error( + " [WARN] Event deletion failed: %s -- events NOT cleared", + exc, + ) + + # ── Update persistent state ─────────────────────────────────── + current_keys = [ + e._waveform_key.hex() + for e in all_events + if e._waveform_key is not None + ] + + if erased_successfully: + # Device memory is clear. Reset downloaded_keys and the + # high-water mark so the next call-home starts fresh and + # doesn't mis-identify the recycled key 01110000 as "seen". + updated_keys = [] + new_max_key = "00000000" log.info( - " NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", - last_count + i, - str(ev.timestamp) if ev.timestamp else "?", - pv.tran if pv else 0, - pv.vert if pv else 0, - pv.long if pv else 0, - pv.peak_vector_sum if pv else 0, - pi.project if pi else "", + " State reset after erase -- next session will download " + "from key 0 (device counter resets after erase)" ) + else: + # Normal (no erase): union of previously-seen + all keys on + # device now. Includes already-seen survivors so we never + # re-download them if the device somehow keeps old records. + updated_keys = sorted(set(seen_keys) | set(current_keys)) + new_max_key = updated_keys[-1] if updated_keys else max_seen_key - # Update high-water mark state[unit_key] = { - "event_count": current_count, - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, + "downloaded_keys": updated_keys, + "max_downloaded_key": new_max_key, + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, } _save_state(self.state_path, state) @@ -395,6 +513,7 @@ def serve(args: argparse.Namespace) -> None: print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f" State file: {state_path}") print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") + print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") @@ -431,6 +550,7 @@ def serve(args: argparse.Namespace) -> None: events_only=args.events_only, max_events=max_ev, state_path=state_path, + clear_after_download=args.clear_after_download, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() @@ -494,6 +614,17 @@ def parse_args() -> argparse.Namespace: "If not specified, all IPs are accepted (not recommended for public servers)." ), ) + p.add_argument( + "--clear-after-download", + action="store_true", + default=False, + help=( + "After successfully downloading new events, erase all events from the " + "device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from " + "4-11-26 MITM capture). Only fires when at least one new event was saved. " + "This mirrors the standard Blastware ACH workflow." + ), + ) p.add_argument( "--verbose", "-v", action="store_true",