feat: overhaul ACH server with key-based state, erase support, and reset detection

State format (ach_state.json):
- Replace event_count with downloaded_keys (set of hex strings) + max_downloaded_key
- Key-based tracking correctly handles delete-then-re-record: after device erase the
  count drops to 0, but new events have new (or recycled) keys

Browse pre-check:
- list_event_keys() walk before get_events() to bail early when nothing is new
- get_events() called with skip_waveform_for_keys= for already-seen keys, so repeat
  call-homes only download waveforms for genuinely new events

--clear-after-download flag:
- After saving new events, calls client.delete_all_events() (0xA3→0x1C→0x06→0xA2)
- On success: resets downloaded_keys=[] and max_downloaded_key="00000000" so the
  next session starts fresh (device counter resets to 0x01110000 after erase)

Post-erase key-reuse detection:
- Device counter resets to 0x01110000 after any erase; new events reuse old keys
- If max(device_keys) < max_downloaded_key, the device was wiped externally
  (Blastware, manual) — seen_keys is discarded and all device keys treated as new

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-11 01:14:50 -04:00
parent e712d68505
commit 09788b931a
+162 -31
View File
@@ -71,9 +71,27 @@ from minimateplus.models import DeviceInfo, Event
log = logging.getLogger("ach_server")
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
# ── Per-unit state (downloaded-key set) ───────────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
# Format:
# {
# "BE11529": {
# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk
# "max_downloaded_key": "0111245a", # highest key ever seen
# "last_seen": "2026-04-11T01:04:36"
# }
# }
#
# Key-based deduplication works well within a single "key generation" (between
# erases). After the device memory is erased the event counter resets to
# 0x01110000, so the first new event has the SAME key as the very first event
# we ever downloaded. We detect this situation with max_downloaded_key:
#
# if max(current_device_keys) < max_downloaded_key
# → device was wiped and keys have restarted → treat all device keys as new
#
# After our own erase (--clear-after-download) we also explicitly clear
# downloaded_keys and max_downloaded_key so the next session starts fresh.
_state_lock = threading.Lock()
@@ -103,10 +121,10 @@ class AchSession:
standard connect → get_device_info → get_events sequence.
State tracking (ach_state.json in output_dir):
On each successful download we record how many events the unit had.
On the next call-home we compare: if count hasn't grown, there's nothing
new and we close cleanly without downloading. If it has grown, we
download all events up to the new count and save only the new ones.
On each successful download we record the SET of event keys downloaded.
On the next call-home we compare: if all device keys are already in the
set, there's nothing new. If any key is new (including after the device
was wiped and re-recorded), we download and save only those events.
"""
def __init__(
@@ -118,6 +136,7 @@ class AchSession:
events_only: bool,
max_events: Optional[int],
state_path: Path,
clear_after_download: bool = False,
) -> None:
self.sock = sock
self.peer = peer
@@ -126,6 +145,7 @@ class AchSession:
self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
self.clear_after_download = clear_after_download
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -221,18 +241,22 @@ class AchSession:
else:
log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: check for new events via high-water mark ───────────────
# ── Step 3: check for new events by comparing key sets ────────────
log.info("Step 3/3: checking for new events")
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
last_count = state.get(unit_key, {}).get("event_count", 0)
unit_state = state.get(unit_key, {})
seen_keys: set[str] = set(unit_state.get("downloaded_keys", []))
# Highest event key ever downloaded from this unit (hex string, 8 chars).
# Used to detect post-erase key reuse — see comment block above.
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
# Use the event count already read from the event index during connect().
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
# Falls back to count_events() only if connect() wasn't called.
if device_info is not None:
current_count = device_info.event_count
current_count = device_info.event_count or 0
else:
try:
current_count = client.count_events()
@@ -240,26 +264,70 @@ class AchSession:
log.error(" [FAIL] count_events failed: %s", exc)
return
log.info(" Unit has %d stored event(s); last downloaded count: %d",
current_count, last_count)
log.info(" Unit has %d stored event(s); %d key(s) previously downloaded",
current_count, len(seen_keys))
if current_count <= last_count:
log.info(" [OK] No new events since last call-home -- nothing to download")
log.info("Session complete (no new events) -> %s", session_dir)
if current_count == 0:
log.info(" [OK] No events on device -- nothing to download")
log.info("Session complete (no events) -> %s", session_dir)
return
new_event_count = current_count - last_count
log.info(" %d new event(s) to download", new_event_count)
# Fast pre-check: walk the event index (browse-mode, no 5A) to get
# the current key list, then bail early if everything is already seen.
# This avoids calling get_events() at all when there's nothing new.
log.info(" Checking device key list (browse walk, no waveform download)...")
try:
device_keys = client.list_event_keys()
except Exception as exc:
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
device_keys = None
# Download all events up to current_count, apply max_events cap.
# We re-download old events too (get_events always starts from 0),
# but we only SAVE the new ones (the last new_event_count of the list).
if device_keys is not None:
# ── Post-erase detection ──────────────────────────────────────
# After the device memory is erased, new events start from key
# 01110000 again — the same keys we already downloaded. Detect
# this by comparing the device's current highest key against the
# historical maximum. If the device has rolled back below our
# high-water mark, its counter was reset and we must treat all
# its keys as new, regardless of what seen_keys contains.
if device_keys and max_seen_key != "00000000":
max_device_key = max(device_keys) # lexicographic; safe because
# keys share the same 4-char prefix
if max_device_key < max_seen_key:
log.info(
" Post-erase reset detected: "
"device max key %s < historical max %s "
"-- treating all device keys as new",
max_device_key, max_seen_key,
)
seen_keys = set() # discard stale dedup info for this session
new_key_set = set(device_keys) - seen_keys
log.info(" Device has %d key(s): %d new, %d already seen",
len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set))
if not new_key_set:
log.info(" [OK] All events already downloaded -- nothing to do")
# Refresh state timestamp; preserve max_seen_key unchanged.
state[unit_key] = {
"downloaded_keys": sorted(seen_keys | set(device_keys)),
"max_downloaded_key": max_seen_key,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
log.info("Session complete (no new events) -> %s", session_dir)
return
else:
new_key_set = None # unknown; proceed with full download
# Apply max_events cap
stop_idx = current_count - 1
if self.max_events is not None:
stop_idx = min(stop_idx, self.max_events - 1)
if self.max_events < current_count:
log.warning(
" max_events=%d cap: will download events 0%d only "
" max_events=%d cap: will download events 0-%d only "
"(unit has %d total)",
self.max_events, stop_idx, current_count,
)
@@ -268,22 +336,32 @@ class AchSession:
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
skip_waveform_for_keys=seen_keys if seen_keys else None,
)
# Only the events beyond last_count are genuinely new
new_events = all_events[last_count:]
log.info(" [OK] Downloaded %d total event(s), %d new",
len(all_events), len(new_events))
# Filter to events whose keys we haven't saved before.
new_events = [
e for e in all_events
if e._waveform_key is None
or e._waveform_key.hex() not in seen_keys
]
skipped = len(all_events) - len(new_events)
log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)",
len(all_events), len(new_events), skipped)
if skipped:
log.info(" (skipped %d already-downloaded event(s))", skipped)
if new_events:
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
if last_count > 0 and len(all_events) > len(new_events):
log.info(" (skipped %d already-seen event(s))", last_count)
for i, ev in enumerate(new_events):
for ev in new_events:
pv = ev.peak_values
pi = ev.project_info
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
log.info(
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
last_count + i,
" NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
key_hex,
str(ev.timestamp) if ev.timestamp else "?",
pv.tran if pv else 0,
pv.vert if pv else 0,
@@ -291,10 +369,50 @@ class AchSession:
pv.peak_vector_sum if pv else 0,
pi.project if pi else "",
)
else:
log.info(" [OK] No new events since last call-home -- nothing to save")
# ── Optional: erase device memory after successful download ────
erased_successfully = False
if self.clear_after_download and new_events:
log.info(" Clearing device memory (--clear-after-download)...")
try:
client.delete_all_events()
log.info(" [OK] Device memory cleared")
erased_successfully = True
except Exception as exc:
log.error(
" [WARN] Event deletion failed: %s -- events NOT cleared",
exc,
)
# ── Update persistent state ───────────────────────────────────
current_keys = [
e._waveform_key.hex()
for e in all_events
if e._waveform_key is not None
]
if erased_successfully:
# Device memory is clear. Reset downloaded_keys and the
# high-water mark so the next call-home starts fresh and
# doesn't mis-identify the recycled key 01110000 as "seen".
updated_keys = []
new_max_key = "00000000"
log.info(
" State reset after erase -- next session will download "
"from key 0 (device counter resets after erase)"
)
else:
# Normal (no erase): union of previously-seen + all keys on
# device now. Includes already-seen survivors so we never
# re-download them if the device somehow keeps old records.
updated_keys = sorted(set(seen_keys) | set(current_keys))
new_max_key = updated_keys[-1] if updated_keys else max_seen_key
# Update high-water mark
state[unit_key] = {
"event_count": current_count,
"downloaded_keys": updated_keys,
"max_downloaded_key": new_max_key,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
@@ -395,6 +513,7 @@ def serve(args: argparse.Namespace) -> None:
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f" State file: {state_path}")
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
@@ -431,6 +550,7 @@ def serve(args: argparse.Namespace) -> None:
events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
clear_after_download=args.clear_after_download,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
@@ -494,6 +614,17 @@ def parse_args() -> argparse.Namespace:
"If not specified, all IPs are accepted (not recommended for public servers)."
),
)
p.add_argument(
"--clear-after-download",
action="store_true",
default=False,
help=(
"After successfully downloading new events, erase all events from the "
"device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from "
"4-11-26 MITM capture). Only fires when at least one new event was saved. "
"This mirrors the standard Blastware ACH workflow."
),
)
p.add_argument(
"--verbose", "-v",
action="store_true",