feat: overhaul ACH server with key-based state, erase support, and reset detection
State format (ach_state.json): - Replace event_count with downloaded_keys (set of hex strings) + max_downloaded_key - Key-based tracking correctly handles delete-then-re-record: after device erase the count drops to 0, but new events have new (or recycled) keys Browse pre-check: - list_event_keys() walk before get_events() to bail early when nothing is new - get_events() called with skip_waveform_for_keys= for already-seen keys, so repeat call-homes only download waveforms for genuinely new events --clear-after-download flag: - After saving new events, calls client.delete_all_events() (0xA3→0x1C→0x06→0xA2) - On success: resets downloaded_keys=[] and max_downloaded_key="00000000" so the next session starts fresh (device counter resets to 0x01110000 after erase) Post-erase key-reuse detection: - Device counter resets to 0x01110000 after any erase; new events reuse old keys - If max(device_keys) < max_downloaded_key, the device was wiped externally (Blastware, manual) — seen_keys is discarded and all device keys treated as new Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+181
-50
@@ -71,9 +71,27 @@ from minimateplus.models import DeviceInfo, Event
|
||||
|
||||
log = logging.getLogger("ach_server")
|
||||
|
||||
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
|
||||
# ── Per-unit state (downloaded-key set) ───────────────────────────────────────
|
||||
# Persisted as <output_dir>/ach_state.json
|
||||
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
|
||||
# Format:
|
||||
# {
|
||||
# "BE11529": {
|
||||
# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk
|
||||
# "max_downloaded_key": "0111245a", # highest key ever seen
|
||||
# "last_seen": "2026-04-11T01:04:36"
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# Key-based deduplication works well within a single "key generation" (between
|
||||
# erases). After the device memory is erased the event counter resets to
|
||||
# 0x01110000, so the first new event has the SAME key as the very first event
|
||||
# we ever downloaded. We detect this situation with max_downloaded_key:
|
||||
#
|
||||
# if max(current_device_keys) < max_downloaded_key
|
||||
# → device was wiped and keys have restarted → treat all device keys as new
|
||||
#
|
||||
# After our own erase (--clear-after-download) we also explicitly clear
|
||||
# downloaded_keys and max_downloaded_key so the next session starts fresh.
|
||||
|
||||
_state_lock = threading.Lock()
|
||||
|
||||
@@ -103,10 +121,10 @@ class AchSession:
|
||||
standard connect → get_device_info → get_events sequence.
|
||||
|
||||
State tracking (ach_state.json in output_dir):
|
||||
On each successful download we record how many events the unit had.
|
||||
On the next call-home we compare: if count hasn't grown, there's nothing
|
||||
new and we close cleanly without downloading. If it has grown, we
|
||||
download all events up to the new count and save only the new ones.
|
||||
On each successful download we record the SET of event keys downloaded.
|
||||
On the next call-home we compare: if all device keys are already in the
|
||||
set, there's nothing new. If any key is new (including after the device
|
||||
was wiped and re-recorded), we download and save only those events.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -118,14 +136,16 @@ class AchSession:
|
||||
events_only: bool,
|
||||
max_events: Optional[int],
|
||||
state_path: Path,
|
||||
clear_after_download: bool = False,
|
||||
) -> None:
|
||||
self.sock = sock
|
||||
self.peer = peer
|
||||
self.output_dir = output_dir
|
||||
self.timeout = timeout
|
||||
self.events_only = events_only
|
||||
self.max_events = max_events
|
||||
self.state_path = state_path
|
||||
self.sock = sock
|
||||
self.peer = peer
|
||||
self.output_dir = output_dir
|
||||
self.timeout = timeout
|
||||
self.events_only = events_only
|
||||
self.max_events = max_events
|
||||
self.state_path = state_path
|
||||
self.clear_after_download = clear_after_download
|
||||
|
||||
def run(self) -> None:
|
||||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
@@ -221,18 +241,22 @@ class AchSession:
|
||||
else:
|
||||
log.info("Step 2/3: skipping device info (--events-only)")
|
||||
|
||||
# ── Step 3: check for new events via high-water mark ───────────────
|
||||
# ── Step 3: check for new events by comparing key sets ────────────
|
||||
log.info("Step 3/3: checking for new events")
|
||||
|
||||
state = _load_state(self.state_path)
|
||||
unit_key = serial or self.peer # fall back to IP if no serial
|
||||
last_count = state.get(unit_key, {}).get("event_count", 0)
|
||||
unit_state = state.get(unit_key, {})
|
||||
seen_keys: set[str] = set(unit_state.get("downloaded_keys", []))
|
||||
# Highest event key ever downloaded from this unit (hex string, 8 chars).
|
||||
# Used to detect post-erase key reuse — see comment block above.
|
||||
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
|
||||
|
||||
# Use the event count already read from the event index during connect().
|
||||
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
|
||||
# Falls back to count_events() only if connect() wasn't called.
|
||||
if device_info is not None:
|
||||
current_count = device_info.event_count
|
||||
current_count = device_info.event_count or 0
|
||||
else:
|
||||
try:
|
||||
current_count = client.count_events()
|
||||
@@ -240,26 +264,70 @@ class AchSession:
|
||||
log.error(" [FAIL] count_events failed: %s", exc)
|
||||
return
|
||||
|
||||
log.info(" Unit has %d stored event(s); last downloaded count: %d",
|
||||
current_count, last_count)
|
||||
log.info(" Unit has %d stored event(s); %d key(s) previously downloaded",
|
||||
current_count, len(seen_keys))
|
||||
|
||||
if current_count <= last_count:
|
||||
log.info(" [OK] No new events since last call-home -- nothing to download")
|
||||
log.info("Session complete (no new events) -> %s", session_dir)
|
||||
if current_count == 0:
|
||||
log.info(" [OK] No events on device -- nothing to download")
|
||||
log.info("Session complete (no events) -> %s", session_dir)
|
||||
return
|
||||
|
||||
new_event_count = current_count - last_count
|
||||
log.info(" %d new event(s) to download", new_event_count)
|
||||
# Fast pre-check: walk the event index (browse-mode, no 5A) to get
|
||||
# the current key list, then bail early if everything is already seen.
|
||||
# This avoids calling get_events() at all when there's nothing new.
|
||||
log.info(" Checking device key list (browse walk, no waveform download)...")
|
||||
try:
|
||||
device_keys = client.list_event_keys()
|
||||
except Exception as exc:
|
||||
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
|
||||
device_keys = None
|
||||
|
||||
# Download all events up to current_count, apply max_events cap.
|
||||
# We re-download old events too (get_events always starts from 0),
|
||||
# but we only SAVE the new ones (the last new_event_count of the list).
|
||||
if device_keys is not None:
|
||||
# ── Post-erase detection ──────────────────────────────────────
|
||||
# After the device memory is erased, new events start from key
|
||||
# 01110000 again — the same keys we already downloaded. Detect
|
||||
# this by comparing the device's current highest key against the
|
||||
# historical maximum. If the device has rolled back below our
|
||||
# high-water mark, its counter was reset and we must treat all
|
||||
# its keys as new, regardless of what seen_keys contains.
|
||||
if device_keys and max_seen_key != "00000000":
|
||||
max_device_key = max(device_keys) # lexicographic; safe because
|
||||
# keys share the same 4-char prefix
|
||||
if max_device_key < max_seen_key:
|
||||
log.info(
|
||||
" Post-erase reset detected: "
|
||||
"device max key %s < historical max %s "
|
||||
"-- treating all device keys as new",
|
||||
max_device_key, max_seen_key,
|
||||
)
|
||||
seen_keys = set() # discard stale dedup info for this session
|
||||
|
||||
new_key_set = set(device_keys) - seen_keys
|
||||
log.info(" Device has %d key(s): %d new, %d already seen",
|
||||
len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set))
|
||||
if not new_key_set:
|
||||
log.info(" [OK] All events already downloaded -- nothing to do")
|
||||
# Refresh state timestamp; preserve max_seen_key unchanged.
|
||||
state[unit_key] = {
|
||||
"downloaded_keys": sorted(seen_keys | set(device_keys)),
|
||||
"max_downloaded_key": max_seen_key,
|
||||
"last_seen": datetime.datetime.now().isoformat(),
|
||||
"serial": serial,
|
||||
"peer": self.peer,
|
||||
}
|
||||
_save_state(self.state_path, state)
|
||||
log.info("Session complete (no new events) -> %s", session_dir)
|
||||
return
|
||||
else:
|
||||
new_key_set = None # unknown; proceed with full download
|
||||
|
||||
# Apply max_events cap
|
||||
stop_idx = current_count - 1
|
||||
if self.max_events is not None:
|
||||
stop_idx = min(stop_idx, self.max_events - 1)
|
||||
if self.max_events < current_count:
|
||||
log.warning(
|
||||
" max_events=%d cap: will download events 0–%d only "
|
||||
" max_events=%d cap: will download events 0-%d only "
|
||||
"(unit has %d total)",
|
||||
self.max_events, stop_idx, current_count,
|
||||
)
|
||||
@@ -268,36 +336,86 @@ class AchSession:
|
||||
all_events = client.get_events(
|
||||
full_waveform=True,
|
||||
stop_after_index=stop_idx,
|
||||
skip_waveform_for_keys=seen_keys if seen_keys else None,
|
||||
)
|
||||
# Only the events beyond last_count are genuinely new
|
||||
new_events = all_events[last_count:]
|
||||
log.info(" [OK] Downloaded %d total event(s), %d new",
|
||||
len(all_events), len(new_events))
|
||||
|
||||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
||||
if last_count > 0 and len(all_events) > len(new_events):
|
||||
log.info(" (skipped %d already-seen event(s))", last_count)
|
||||
# Filter to events whose keys we haven't saved before.
|
||||
new_events = [
|
||||
e for e in all_events
|
||||
if e._waveform_key is None
|
||||
or e._waveform_key.hex() not in seen_keys
|
||||
]
|
||||
skipped = len(all_events) - len(new_events)
|
||||
|
||||
for i, ev in enumerate(new_events):
|
||||
pv = ev.peak_values
|
||||
pi = ev.project_info
|
||||
log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)",
|
||||
len(all_events), len(new_events), skipped)
|
||||
if skipped:
|
||||
log.info(" (skipped %d already-downloaded event(s))", skipped)
|
||||
|
||||
if new_events:
|
||||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
||||
|
||||
for ev in new_events:
|
||||
pv = ev.peak_values
|
||||
pi = ev.project_info
|
||||
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
|
||||
log.info(
|
||||
" NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
|
||||
key_hex,
|
||||
str(ev.timestamp) if ev.timestamp else "?",
|
||||
pv.tran if pv else 0,
|
||||
pv.vert if pv else 0,
|
||||
pv.long if pv else 0,
|
||||
pv.peak_vector_sum if pv else 0,
|
||||
pi.project if pi else "",
|
||||
)
|
||||
else:
|
||||
log.info(" [OK] No new events since last call-home -- nothing to save")
|
||||
|
||||
# ── Optional: erase device memory after successful download ────
|
||||
erased_successfully = False
|
||||
if self.clear_after_download and new_events:
|
||||
log.info(" Clearing device memory (--clear-after-download)...")
|
||||
try:
|
||||
client.delete_all_events()
|
||||
log.info(" [OK] Device memory cleared")
|
||||
erased_successfully = True
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
" [WARN] Event deletion failed: %s -- events NOT cleared",
|
||||
exc,
|
||||
)
|
||||
|
||||
# ── Update persistent state ───────────────────────────────────
|
||||
current_keys = [
|
||||
e._waveform_key.hex()
|
||||
for e in all_events
|
||||
if e._waveform_key is not None
|
||||
]
|
||||
|
||||
if erased_successfully:
|
||||
# Device memory is clear. Reset downloaded_keys and the
|
||||
# high-water mark so the next call-home starts fresh and
|
||||
# doesn't mis-identify the recycled key 01110000 as "seen".
|
||||
updated_keys = []
|
||||
new_max_key = "00000000"
|
||||
log.info(
|
||||
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
|
||||
last_count + i,
|
||||
str(ev.timestamp) if ev.timestamp else "?",
|
||||
pv.tran if pv else 0,
|
||||
pv.vert if pv else 0,
|
||||
pv.long if pv else 0,
|
||||
pv.peak_vector_sum if pv else 0,
|
||||
pi.project if pi else "",
|
||||
" State reset after erase -- next session will download "
|
||||
"from key 0 (device counter resets after erase)"
|
||||
)
|
||||
else:
|
||||
# Normal (no erase): union of previously-seen + all keys on
|
||||
# device now. Includes already-seen survivors so we never
|
||||
# re-download them if the device somehow keeps old records.
|
||||
updated_keys = sorted(set(seen_keys) | set(current_keys))
|
||||
new_max_key = updated_keys[-1] if updated_keys else max_seen_key
|
||||
|
||||
# Update high-water mark
|
||||
state[unit_key] = {
|
||||
"event_count": current_count,
|
||||
"last_seen": datetime.datetime.now().isoformat(),
|
||||
"serial": serial,
|
||||
"peer": self.peer,
|
||||
"downloaded_keys": updated_keys,
|
||||
"max_downloaded_key": new_max_key,
|
||||
"last_seen": datetime.datetime.now().isoformat(),
|
||||
"serial": serial,
|
||||
"peer": self.peer,
|
||||
}
|
||||
_save_state(self.state_path, state)
|
||||
|
||||
@@ -395,6 +513,7 @@ def serve(args: argparse.Namespace) -> None:
|
||||
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
||||
print(f" State file: {state_path}")
|
||||
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
||||
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
|
||||
print(f"{'='*60}")
|
||||
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
||||
print(f" Remote Host: <this machine's LAN IP>")
|
||||
@@ -431,6 +550,7 @@ def serve(args: argparse.Namespace) -> None:
|
||||
events_only=args.events_only,
|
||||
max_events=max_ev,
|
||||
state_path=state_path,
|
||||
clear_after_download=args.clear_after_download,
|
||||
)
|
||||
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
||||
t.start()
|
||||
@@ -494,6 +614,17 @@ def parse_args() -> argparse.Namespace:
|
||||
"If not specified, all IPs are accepted (not recommended for public servers)."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--clear-after-download",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help=(
|
||||
"After successfully downloading new events, erase all events from the "
|
||||
"device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from "
|
||||
"4-11-26 MITM capture). Only fires when at least one new event was saved. "
|
||||
"This mirrors the standard Blastware ACH workflow."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--verbose", "-v",
|
||||
action="store_true",
|
||||
|
||||
Reference in New Issue
Block a user