#!/usr/bin/env python3 """ ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. This IS your test server. Run it on any machine on the same network, point a unit's ACEmanager call-home destination at it, and it will speak the full BW protocol to the device: handshake, pull device info, download all events, save everything as JSON. The key thing this script tells you that no amount of packet sniffing can: - Does the device speak first (push) or wait for us to send POLL (pull)? If startup() completes normally → it's pull protocol, same as Blastware. If startup() times out → the device sent something first; check raw_rx.bin. Usage ----- python bridges/ach_server.py [--port 12345] [--output bridges/captures/] Setup ----- 1. Run this script on a machine on your local network. 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) find the Call Home / ACH settings. Set: Remote Host: Remote Port: 12345 3. Trigger the unit (wait for a vibration event, or use the manual call-home button if your firmware version has one). 4. The unit connects. This script handshakes, downloads all events, and saves a timestamped session directory. Output per session ------------------ bridges/captures/ach_inbound_/ device_info.json — serial number, firmware version, calibration date, etc. events.json — all events: timestamp, PPV per channel, peaks, metadata raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer session_.log — detailed protocol log What to look for ---------------- Push vs pull: Check session_.log. If the first line after "Connected" shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL gets a clean response, it's pull. Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry bulk waveform data — if frequency is sent pre-computed there will be float32 values before the ADC sample blocks. ACH-specific framing: Does the unit send anything extra before the DLE+STX framing starts? raw_rx.bin will show raw bytes including any preamble. """ from __future__ import annotations import argparse import datetime import json import logging import socket import sys import threading from pathlib import Path from typing import Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event, MonitorLogEntry from sfm.database import SeismoDb log = logging.getLogger("ach_server") # ── Per-unit state (downloaded-key set) ─────────────────────────────────────── # Persisted as /ach_state.json # Format: # { # "BE11529": { # "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk # "max_downloaded_key": "0111245a", # highest key ever seen # "last_seen": "2026-04-11T01:04:36" # } # } # # Key-based deduplication works well within a single "key generation" (between # erases). After the device memory is erased the event counter resets to # 0x01110000, so the first new event has the SAME key as the very first event # we ever downloaded. We detect this situation with max_downloaded_key: # # if max(current_device_keys) < max_downloaded_key # → device was wiped and keys have restarted → treat all device keys as new # # After our own erase (--clear-after-download) we also explicitly clear # downloaded_keys and max_downloaded_key so the next session starts fresh. _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: if state_path.exists(): try: with open(state_path) as f: return json.load(f) except Exception: pass return {} def _save_state(state_path: Path, state: dict) -> None: with _state_lock: with open(state_path, "w") as f: json.dump(state, f, indent=2) # ── Per-session handler ──────────────────────────────────────────────────────── class AchSession: """ Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. State tracking (ach_state.json in output_dir): On each successful download we record the SET of event keys downloaded. On the next call-home we compare: if all device keys are already in the set, there's nothing new. If any key is new (including after the device was wiped and re-recorded), we download and save only those events. """ def __init__( self, sock: socket.socket, peer: str, output_dir: Path, timeout: float, events_only: bool, max_events: Optional[int], state_path: Path, db: "SeismoDb", clear_after_download: bool = False, restart_monitoring: bool = False, ) -> None: self.sock = sock self.peer = peer self.output_dir = output_dir self.timeout = timeout self.events_only = events_only self.max_events = max_events self.state_path = state_path self.db = db self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Session dir and file handler are created lazily — only after startup # succeeds. This prevents internet scanners and dropped connections from # littering the output directory with empty session folders. try: self._run_inner(ts) except Exception as exc: log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) finally: try: self.sock.close() except Exception: pass def _run_inner(self, ts: str) -> None: transport = SocketTransport(self.sock, peer=self.peer) # Collect raw bytes in memory until startup succeeds, then flush to disk. raw_buf: list[bytes] = [] _orig_read = transport.read def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: raw_buf.append(data) return data transport.read = tapped_read # type: ignore[method-assign] serial: Optional[str] = None # ── Step 1: startup handshake ───────────────────────────────────────── # Do this BEFORE creating the session directory so that scanner probes # and dropped connections leave no trace on disk. try: from minimateplus.protocol import MiniMateProtocol client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() except Exception as exc: log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) return # no session dir created # Startup succeeded — this is a real unit. Create session dir now. session_dir = self.output_dir / f"ach_inbound_{ts}" session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / f"session_{ts}.log" raw_path = session_dir / f"raw_rx_{ts}.bin" # Flush buffered raw bytes to file and switch to direct file writes. raw_fh = open(raw_path, "wb") for chunk in raw_buf: raw_fh.write(chunk) raw_buf.clear() def tapped_read_file(n: int) -> bytes: data = _orig_read(n) if data: raw_fh.write(data) raw_fh.flush() return data transport.read = tapped_read_file # type: ignore[method-assign] # Wire up file handler now that the session dir exists. fh = logging.FileHandler(log_path, encoding="utf-8") fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) root_logger = logging.getLogger() root_logger.addHandler(fh) try: # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: log.info("Step 2/3: reading device info") try: device_info = client.connect() serial = device_info.serial _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " [OK] Device: serial=%s firmware=%s model=%s events=%d", serial, device_info.firmware_version, device_info.model, device_info.event_count or 0, ) except Exception as exc: log.error(" [FAIL] Device info failed: %s", exc) else: log.info("Step 2/3: skipping device info (--events-only)") # ── Step 3: check for new events by comparing key sets ──────────── log.info("Step 3/3: checking for new events") state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial unit_state = state.get(unit_key, {}) seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) # Highest event key ever downloaded from this unit (hex string, 8 chars). # Used to detect post-erase key reuse — see comment block above. max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") # Walk the event index (browse-mode, no 5A) to get the actual current # key list. The SUB 08 event_count field is a lifetime "total events # ever recorded" counter that does NOT decrement on erase — confirmed # 2026-04-13. list_event_keys() via the 1E/1F chain is the only # reliable way to know what is actually stored on the device right now. log.info(" Checking device key list (browse walk, no waveform download)...") try: device_keys = client.list_event_keys() except Exception as exc: log.warning(" list_event_keys failed: %s -- falling back to full download", exc) device_keys = None # Use the walk result as our authoritative current count. current_count = len(device_keys) if device_keys is not None else 0 log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", current_count, len(seen_keys)) if device_keys is not None and current_count == 0: log.info(" [OK] No events on device -- nothing to download") log.info("Session complete (no events) -> %s", session_dir) return if device_keys is not None: # ── Post-erase detection ────────────────────────────────────── # After the device memory is erased, new events start from key # 01110000 again — the same keys we already downloaded. Detect # this by comparing the device's current highest key against the # historical maximum. If the device has rolled back below our # high-water mark, its counter was reset and we must treat all # its keys as new, regardless of what seen_keys contains. if device_keys and max_seen_key != "00000000": max_device_key = max(device_keys) # lexicographic; safe because # keys share the same 4-char prefix if max_device_key < max_seen_key: log.info( " Post-erase reset detected: " "device max key %s < historical max %s " "-- treating all device keys as new", max_device_key, max_seen_key, ) seen_keys = set() # discard stale dedup info for this session new_key_set = set(device_keys) - seen_keys log.info(" Device has %d key(s): %d new, %d already seen", len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) if not new_key_set: log.info(" [OK] All events already downloaded -- nothing to do") # Refresh state timestamp; preserve max_seen_key unchanged. state[unit_key] = { "downloaded_keys": sorted(seen_keys | set(device_keys)), "max_downloaded_key": max_seen_key, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) # ── Erase even when no new events (if requested) ────────── # Blastware ACH always erases after every session — even when # nothing new was downloaded. Without the erase the device # still sees stored events in its memory and immediately # retries the call-home, causing the looping we observed. # Only erase when device actually has events stored; skip # the erase if device_keys is empty (nothing to erase). if self.clear_after_download and device_keys: log.info( " Clearing device memory (--clear-after-download, " "no new events but device has %d stored)...", len(device_keys), ) try: client.delete_all_events() log.info(" [OK] Device memory cleared") # Reset state so the next session starts fresh. state[unit_key] = { "downloaded_keys": [], "max_downloaded_key": "00000000", "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) except Exception as exc: log.error( " [WARN] Event deletion failed: %s -- events NOT cleared", exc, ) log.info("Session complete (no new events) -> %s", session_dir) return else: new_key_set = None # unknown; proceed with full download # Apply max_events cap # stop_idx: when we know the count from list_event_keys, use it as # an upper bound. When list_event_keys failed (device_keys is None), # pass None — get_events will run until the null sentinel naturally. stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None if self.max_events is not None: cap = self.max_events - 1 stop_idx = cap if stop_idx is None else min(stop_idx, cap) if device_keys is not None and self.max_events < current_count: log.warning( " max_events=%d cap: will download events 0-%d only " "(unit has %d total)", self.max_events, stop_idx, current_count, ) try: all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, skip_waveform_for_keys=seen_keys if seen_keys else None, ) # Filter to events whose keys we haven't saved before. new_events = [ e for e in all_events if e._waveform_key is None or e._waveform_key.hex() not in seen_keys ] skipped = len(all_events) - len(new_events) log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", len(all_events), len(new_events), skipped) if skipped: log.info(" (skipped %d already-downloaded event(s))", skipped) if new_events: _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) for ev in new_events: pv = ev.peak_values pi = ev.project_info key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" log.info( " NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", key_hex, str(ev.timestamp) if ev.timestamp else "?", pv.tran if pv else 0, pv.vert if pv else 0, pv.long if pv else 0, pv.peak_vector_sum if pv else 0, pi.project if pi else "", ) else: log.info(" [OK] No new events since last call-home -- nothing to save") # ── Monitor log entries (partial records / continuous monitoring) ── # Browse walk (0A + 1F only) to collect monitor log entries for # recording intervals where no threshold was crossed. This is a # second 1E-based pass over the device's record list, separate from # the get_events() download loop above. log.info(" Collecting monitor log entries (browse walk)...") new_monitor_entries: list[MonitorLogEntry] = [] try: new_monitor_entries = client.get_monitor_log_entries( skip_keys=seen_keys if seen_keys else None, ) if new_monitor_entries: _save_json( session_dir / "monitor_log.json", [_monitor_log_entry_to_dict(e) for e in new_monitor_entries], ) log.info( " [OK] %d new monitor log entry(s) saved", len(new_monitor_entries), ) for ml in new_monitor_entries: log.info( " MONLOG [%s] %s → %s (%s)", ml.key, ml.start_time.isoformat() if ml.start_time else "?", ml.stop_time.isoformat() if ml.stop_time else "?", f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s", ) else: log.info(" [OK] No new monitor log entries") except Exception as exc: log.warning( " [WARN] Monitor log collection failed: %s -- continuing", exc, ) # ── Persist to SQLite DB ───────────────────────────────────── _session_start = datetime.datetime.now() try: _ev_ins, _ev_skip = self.db.insert_events( new_events, serial=serial or self.peer, session_id=None ) _ml_ins, _ml_skip = self.db.insert_monitor_log( new_monitor_entries, session_id=None ) _session_id = self.db.insert_ach_session( serial=serial or self.peer, peer=self.peer, events_downloaded=_ev_ins, monitor_entries=_ml_ins, duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(), session_time=_session_start, ) log.info( " [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)", _session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip, ) except Exception as exc: log.warning(" [WARN] DB write failed: %s -- continuing", exc) # ── Optional: erase device memory after successful download ──── erased_successfully = False if self.clear_after_download and new_events: log.info(" Clearing device memory (--clear-after-download)...") try: client.delete_all_events() log.info(" [OK] Device memory cleared") erased_successfully = True except Exception as exc: log.error( " [WARN] Event deletion failed: %s -- events NOT cleared", exc, ) # ── Update persistent state ─────────────────────────────────── # Include both triggered-event keys and monitor-log keys in the # downloaded set so they are not re-processed on the next call-home. current_event_keys = [ e._waveform_key.hex() for e in all_events if e._waveform_key is not None ] current_monitor_keys = [e.key for e in new_monitor_entries] current_keys = current_event_keys + current_monitor_keys if erased_successfully: # Device memory is clear. Reset downloaded_keys and the # high-water mark so the next call-home starts fresh and # doesn't mis-identify the recycled key 01110000 as "seen". updated_keys = [] new_max_key = "00000000" log.info( " State reset after erase -- next session will download " "from key 0 (device counter resets after erase)" ) else: # Normal (no erase): union of previously-seen + all keys on # device now. Includes already-seen survivors so we never # re-download them if the device somehow keeps old records. updated_keys = sorted(set(seen_keys) | set(current_keys)) new_max_key = updated_keys[-1] if updated_keys else max_seen_key state[unit_key] = { "downloaded_keys": updated_keys, "max_downloaded_key": new_max_key, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) except Exception as exc: log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) # ── Optional: restart monitoring after successful download ───────── if self.restart_monitoring: log.info(" Restarting monitoring on device (--restart-monitoring)...") try: client.start_monitoring() log.info(" [OK] Monitoring restarted") except Exception as exc: log.warning(" [WARN] Failed to restart monitoring: %s", exc) finally: raw_fh.close() client.close() # closes transport / socket cleanly root_logger.removeHandler(fh) fh.close() log.info("Session complete -> %s", session_dir) log.info("="*60) # ── JSON helpers ─────────────────────────────────────────────────────────────── def _save_json(path: Path, obj: object) -> None: with open(path, "w") as f: json.dump(obj, f, indent=2, default=str) log.debug("Saved %s", path) def _device_info_to_dict(d: DeviceInfo) -> dict: cc = d.compliance_config return { "serial": d.serial, "firmware_version": d.firmware_version, "dsp_version": d.dsp_version, "model": d.model, "event_count": d.event_count, # compliance config fields (None if 1A read failed) "setup_name": cc.setup_name if cc else None, "sample_rate": cc.sample_rate if cc else None, "record_time": cc.record_time if cc else None, "trigger_level_geo": cc.trigger_level_geo if cc else None, "alarm_level_geo": cc.alarm_level_geo if cc else None, "max_range_geo": cc.max_range_geo if cc else None, # hw constant 6.206053 "max_range_geo_enum": cc.max_range_geo_enum if cc else None, # 0x01=10in/s, 0x00=1.25in/s (unconfirmed) "project": cc.project if cc else None, "client": cc.client if cc else None, "operator": cc.operator if cc else None, "sensor_location": cc.sensor_location if cc else None, } def _event_to_dict(e: Event) -> dict: pv = e.peak_values pi = e.project_info peaks = {} if pv: peaks = { "transverse": pv.tran, "vertical": pv.vert, "longitudinal": pv.long, "vector_sum": pv.peak_vector_sum, "mic": pv.micl, } samples = {} if e.raw_samples: samples = { ch: vals[:20] # first 20 sample-sets to keep the file sane for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" return { "timestamp": str(e.timestamp) if e.timestamp else None, "project": pi.project if pi else None, "client": pi.client if pi else None, "operator": pi.operator if pi else None, "sensor_location": pi.sensor_location if pi else None, "peaks": peaks, "raw_samples_preview": samples, } def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict: return { "key": e.key, "start_time": e.start_time.isoformat() if e.start_time else None, "stop_time": e.stop_time.isoformat() if e.stop_time else None, "duration_seconds": e.duration_seconds, "serial": e.serial, "geo_threshold_ips": e.geo_threshold_ips, } # ── Main server loop ─────────────────────────────────────────────────────────── def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" db = SeismoDb(output_dir / "seismo_relay.db") server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) # Wake up every second so Ctrl-C is handled promptly on Windows. # Without this, accept() blocks indefinitely and ignores KeyboardInterrupt. server_sock.settimeout(1.0) max_ev = args.max_events print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f" State file: {state_path}") print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") allow_ips = set(args.allow_ips) if allow_ips: print(f" Allowlist: {', '.join(sorted(allow_ips))}") else: print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") try: while True: try: client_sock, addr = server_sock.accept() except socket.timeout: continue # no connection this second; loop back and check for Ctrl-C try: peer_ip = addr[0] peer = f"{addr[0]}:{addr[1]}" if allow_ips and peer_ip not in allow_ips: log.info("Rejected connection from %s (not in allowlist)", peer) client_sock.close() continue log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, peer=peer, output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, max_events=max_ev, state_path=state_path, db=db, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() except KeyboardInterrupt: raise except Exception as exc: log.error("Accept error: %s", exc) finally: server_sock.close() print("\nServer stopped.") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--port", "-p", type=int, default=12345, help="Port to listen on (default: 12345).", ) p.add_argument( "--output", "-o", default=str(Path(__file__).parent / "captures"), metavar="DIR", help="Directory to write session captures (default: bridges/captures/).", ) p.add_argument( "--timeout", "-t", type=float, default=30.0, help="Protocol receive timeout in seconds (default: 30.0).", ) p.add_argument( "--events-only", action="store_true", help="Skip the device-info step and go straight to event download.", ) p.add_argument( "--max-events", type=int, default=None, metavar="N", help=( "Safety cap: download at most N events per session (default: unlimited). " "Useful if a unit has many old events stored — prevents a very long first run." ), ) p.add_argument( "--allow-ip", metavar="IP", action="append", dest="allow_ips", default=[], help=( "Only accept connections from this IP address (repeat for multiple). " "Example: --allow-ip 63.43.212.232 " "If not specified, all IPs are accepted (not recommended for public servers)." ), ) p.add_argument( "--restart-monitoring", action="store_true", default=False, help=( "After downloading events, send SUB 0x96 (start monitoring) before " "disconnecting. Required for RV55 units whose firmware does not assert " "DCD on disconnect — without this the unit stays idle after a call-home." ), ) p.add_argument( "--clear-after-download", action="store_true", default=False, help=( "After successfully downloading new events, erase all events from the " "device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from " "4-11-26 MITM capture). Only fires when at least one new event was saved. " "This mirrors the standard Blastware ACH workflow." ), ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: serve(args) except KeyboardInterrupt: print("\nStopped.")