#!/usr/bin/env python3 """ ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. This IS your test server. Run it on any machine on the same network, point a unit's ACEmanager call-home destination at it, and it will speak the full BW protocol to the device: handshake, pull device info, download all events, save everything as JSON. The key thing this script tells you that no amount of packet sniffing can: - Does the device speak first (push) or wait for us to send POLL (pull)? If startup() completes normally → it's pull protocol, same as Blastware. If startup() times out → the device sent something first; check raw_rx.bin. Usage ----- python bridges/ach_server.py [--port 12345] [--output bridges/captures/] Setup ----- 1. Run this script on a machine on your local network. 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) find the Call Home / ACH settings. Set: Remote Host: Remote Port: 12345 3. Trigger the unit (wait for a vibration event, or use the manual call-home button if your firmware version has one). 4. The unit connects. This script handshakes, downloads all events, and saves a timestamped session directory. Output per session ------------------ bridges/captures/ach_inbound_/ device_info.json — serial number, firmware version, calibration date, etc. events.json — all events: timestamp, PPV per channel, peaks, metadata raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer session_.log — detailed protocol log What to look for ---------------- Push vs pull: Check session_.log. If the first line after "Connected" shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL gets a clean response, it's pull. Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry bulk waveform data — if frequency is sent pre-computed there will be float32 values before the ADC sample blocks. ACH-specific framing: Does the unit send anything extra before the DLE+STX framing starts? raw_rx.bin will show raw bytes including any preamble. """ from __future__ import annotations import argparse import datetime import json import logging import socket import sys import threading from pathlib import Path from typing import Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event log = logging.getLogger("ach_server") # ── Per-unit state (high-water mark) ────────────────────────────────────────── # Persisted as /ach_state.json # Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... } _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: if state_path.exists(): try: with open(state_path) as f: return json.load(f) except Exception: pass return {} def _save_state(state_path: Path, state: dict) -> None: with _state_lock: with open(state_path, "w") as f: json.dump(state, f, indent=2) # ── Per-session handler ──────────────────────────────────────────────────────── class AchSession: """ Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. State tracking (ach_state.json in output_dir): On each successful download we record how many events the unit had. On the next call-home we compare: if count hasn't grown, there's nothing new and we close cleanly without downloading. If it has grown, we download all events up to the new count and save only the new ones. """ def __init__( self, sock: socket.socket, peer: str, output_dir: Path, timeout: float, events_only: bool, max_events: Optional[int], state_path: Path, ) -> None: self.sock = sock self.peer = peer self.output_dir = output_dir self.timeout = timeout self.events_only = events_only self.max_events = max_events self.state_path = state_path def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Session dir and file handler are created lazily — only after startup # succeeds. This prevents internet scanners and dropped connections from # littering the output directory with empty session folders. try: self._run_inner(ts) except Exception as exc: log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) finally: try: self.sock.close() except Exception: pass def _run_inner(self, ts: str) -> None: transport = SocketTransport(self.sock, peer=self.peer) # Collect raw bytes in memory until startup succeeds, then flush to disk. raw_buf: list[bytes] = [] _orig_read = transport.read def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: raw_buf.append(data) return data transport.read = tapped_read # type: ignore[method-assign] serial: Optional[str] = None # ── Step 1: startup handshake ───────────────────────────────────────── # Do this BEFORE creating the session directory so that scanner probes # and dropped connections leave no trace on disk. try: from minimateplus.protocol import MiniMateProtocol client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() except Exception as exc: log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) return # no session dir created # Startup succeeded — this is a real unit. Create session dir now. session_dir = self.output_dir / f"ach_inbound_{ts}" session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / f"session_{ts}.log" raw_path = session_dir / f"raw_rx_{ts}.bin" # Flush buffered raw bytes to file and switch to direct file writes. raw_fh = open(raw_path, "wb") for chunk in raw_buf: raw_fh.write(chunk) raw_buf.clear() def tapped_read_file(n: int) -> bytes: data = _orig_read(n) if data: raw_fh.write(data) raw_fh.flush() return data transport.read = tapped_read_file # type: ignore[method-assign] # Wire up file handler now that the session dir exists. fh = logging.FileHandler(log_path, encoding="utf-8") fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) root_logger = logging.getLogger() root_logger.addHandler(fh) try: # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: log.info("Step 2/3: reading device info") try: device_info = client.connect() serial = device_info.serial _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " [OK] Device: serial=%s firmware=%s model=%s events=%d", serial, device_info.firmware_version, device_info.model, device_info.event_count or 0, ) except Exception as exc: log.error(" [FAIL] Device info failed: %s", exc) else: log.info("Step 2/3: skipping device info (--events-only)") # ── Step 3: check for new events via high-water mark ─────────────── log.info("Step 3/3: checking for new events") state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial last_count = state.get(unit_key, {}).get("event_count", 0) # Use the event count already read from the event index during connect(). # This is fast (no extra round-trips) and confirmed accurate (matches LCD). # Falls back to count_events() only if connect() wasn't called. if device_info is not None: current_count = device_info.event_count else: try: current_count = client.count_events() except Exception as exc: log.error(" [FAIL] count_events failed: %s", exc) return log.info(" Unit has %d stored event(s); last downloaded count: %d", current_count, last_count) if current_count <= last_count: log.info(" [OK] No new events since last call-home -- nothing to download") log.info("Session complete (no new events) -> %s", session_dir) return new_event_count = current_count - last_count log.info(" %d new event(s) to download", new_event_count) # Download all events up to current_count, apply max_events cap. # We re-download old events too (get_events always starts from 0), # but we only SAVE the new ones (the last new_event_count of the list). stop_idx = current_count - 1 if self.max_events is not None: stop_idx = min(stop_idx, self.max_events - 1) if self.max_events < current_count: log.warning( " max_events=%d cap: will download events 0–%d only " "(unit has %d total)", self.max_events, stop_idx, current_count, ) try: all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, ) # Only the events beyond last_count are genuinely new new_events = all_events[last_count:] log.info(" [OK] Downloaded %d total event(s), %d new", len(all_events), len(new_events)) _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) if last_count > 0 and len(all_events) > len(new_events): log.info(" (skipped %d already-seen event(s))", last_count) for i, ev in enumerate(new_events): pv = ev.peak_values pi = ev.project_info log.info( " NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", last_count + i, str(ev.timestamp) if ev.timestamp else "?", pv.tran if pv else 0, pv.vert if pv else 0, pv.long if pv else 0, pv.peak_vector_sum if pv else 0, pi.project if pi else "", ) # Update high-water mark state[unit_key] = { "event_count": current_count, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) except Exception as exc: log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) finally: raw_fh.close() client.close() # closes transport / socket cleanly root_logger.removeHandler(fh) fh.close() log.info("Session complete -> %s", session_dir) log.info("="*60) # ── JSON helpers ─────────────────────────────────────────────────────────────── def _save_json(path: Path, obj: object) -> None: with open(path, "w") as f: json.dump(obj, f, indent=2, default=str) log.debug("Saved %s", path) def _device_info_to_dict(d: DeviceInfo) -> dict: cc = d.compliance_config return { "serial": d.serial, "firmware_version": d.firmware_version, "dsp_version": d.dsp_version, "model": d.model, "event_count": d.event_count, # compliance config fields (None if 1A read failed) "setup_name": cc.setup_name if cc else None, "sample_rate": cc.sample_rate if cc else None, "record_time": cc.record_time if cc else None, "trigger_level_geo": cc.trigger_level_geo if cc else None, "alarm_level_geo": cc.alarm_level_geo if cc else None, "max_range_geo": cc.max_range_geo if cc else None, "project": cc.project if cc else None, "client": cc.client if cc else None, "operator": cc.operator if cc else None, "sensor_location": cc.sensor_location if cc else None, } def _event_to_dict(e: Event) -> dict: pv = e.peak_values pi = e.project_info peaks = {} if pv: peaks = { "transverse": pv.tran, "vertical": pv.vert, "longitudinal": pv.long, "vector_sum": pv.peak_vector_sum, "mic": pv.micl, } samples = {} if e.raw_samples: samples = { ch: vals[:20] # first 20 sample-sets to keep the file sane for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" return { "timestamp": str(e.timestamp) if e.timestamp else None, "project": pi.project if pi else None, "client": pi.client if pi else None, "operator": pi.operator if pi else None, "sensor_location": pi.sensor_location if pi else None, "peaks": peaks, "raw_samples_preview": samples, } # ── Main server loop ─────────────────────────────────────────────────────────── def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) # Wake up every second so Ctrl-C is handled promptly on Windows. # Without this, accept() blocks indefinitely and ignores KeyboardInterrupt. server_sock.settimeout(1.0) max_ev = args.max_events print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f" State file: {state_path}") print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") allow_ips = set(args.allow_ips) if allow_ips: print(f" Allowlist: {', '.join(sorted(allow_ips))}") else: print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") try: while True: try: client_sock, addr = server_sock.accept() except socket.timeout: continue # no connection this second; loop back and check for Ctrl-C try: peer_ip = addr[0] peer = f"{addr[0]}:{addr[1]}" if allow_ips and peer_ip not in allow_ips: log.info("Rejected connection from %s (not in allowlist)", peer) client_sock.close() continue log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, peer=peer, output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, max_events=max_ev, state_path=state_path, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() except KeyboardInterrupt: raise except Exception as exc: log.error("Accept error: %s", exc) finally: server_sock.close() print("\nServer stopped.") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--port", "-p", type=int, default=12345, help="Port to listen on (default: 12345).", ) p.add_argument( "--output", "-o", default=str(Path(__file__).parent / "captures"), metavar="DIR", help="Directory to write session captures (default: bridges/captures/).", ) p.add_argument( "--timeout", "-t", type=float, default=30.0, help="Protocol receive timeout in seconds (default: 30.0).", ) p.add_argument( "--events-only", action="store_true", help="Skip the device-info step and go straight to event download.", ) p.add_argument( "--max-events", type=int, default=None, metavar="N", help=( "Safety cap: download at most N events per session (default: unlimited). " "Useful if a unit has many old events stored — prevents a very long first run." ), ) p.add_argument( "--allow-ip", metavar="IP", action="append", dest="allow_ips", default=[], help=( "Only accept connections from this IP address (repeat for multiple). " "Example: --allow-ip 63.43.212.232 " "If not specified, all IPs are accepted (not recommended for public servers)." ), ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: serve(args) except KeyboardInterrupt: print("\nStopped.")