#!/usr/bin/env python3 """ ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. This IS your test server. Run it on any machine on the same network, point a unit's ACEmanager call-home destination at it, and it will speak the full BW protocol to the device: handshake, pull device info, download all events, save everything as JSON. The key thing this script tells you that no amount of packet sniffing can: - Does the device speak first (push) or wait for us to send POLL (pull)? If startup() completes normally → it's pull protocol, same as Blastware. If startup() times out → the device sent something first; check raw_rx.bin. Usage ----- python bridges/ach_server.py [--port 12345] [--output bridges/captures/] Setup ----- 1. Run this script on a machine on your local network. 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) find the Call Home / ACH settings. Set: Remote Host: Remote Port: 12345 3. Trigger the unit (wait for a vibration event, or use the manual call-home button if your firmware version has one). 4. The unit connects. This script handshakes, downloads all events, and saves a timestamped session directory. Output per session ------------------ bridges/captures/ach_inbound_/ device_info.json — serial number, firmware version, calibration date, etc. events.json — all events: timestamp, PPV per channel, peaks, metadata raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer session_.log — detailed protocol log What to look for ---------------- Push vs pull: Check session_.log. If the first line after "Connected" shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL gets a clean response, it's pull. Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry bulk waveform data — if frequency is sent pre-computed there will be float32 values before the ADC sample blocks. ACH-specific framing: Does the unit send anything extra before the DLE+STX framing starts? raw_rx.bin will show raw bytes including any preamble. """ from __future__ import annotations import argparse import datetime import json import logging import socket import sys import threading from pathlib import Path from typing import Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event log = logging.getLogger("ach_server") # ── Per-unit state (high-water mark) ────────────────────────────────────────── # Persisted as /ach_state.json # Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... } _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: if state_path.exists(): try: with open(state_path) as f: return json.load(f) except Exception: pass return {} def _save_state(state_path: Path, state: dict) -> None: with _state_lock: with open(state_path, "w") as f: json.dump(state, f, indent=2) # ── Per-session handler ──────────────────────────────────────────────────────── class AchSession: """ Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. State tracking (ach_state.json in output_dir): On each successful download we record how many events the unit had. On the next call-home we compare: if count hasn't grown, there's nothing new and we close cleanly without downloading. If it has grown, we download all events up to the new count and save only the new ones. """ def __init__( self, sock: socket.socket, peer: str, output_dir: Path, timeout: float, events_only: bool, max_events: Optional[int], state_path: Path, ) -> None: self.sock = sock self.peer = peer self.output_dir = output_dir self.timeout = timeout self.events_only = events_only self.max_events = max_events self.state_path = state_path def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") session_dir = self.output_dir / f"ach_inbound_{ts}" session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / f"session_{ts}.log" raw_path = session_dir / f"raw_rx_{ts}.bin" # Wire up a file handler so every protocol log line goes to the session log fh = logging.FileHandler(log_path) fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) root_logger = logging.getLogger() root_logger.addHandler(fh) try: self._run_inner(session_dir, raw_path, ts) except Exception as exc: log.error("Session failed: %s", exc, exc_info=True) finally: root_logger.removeHandler(fh) fh.close() try: self.sock.close() except Exception: pass def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: log.info("="*60) log.info("Inbound connection from %s", self.peer) log.info("Session dir: %s", session_dir) transport = SocketTransport(self.sock, peer=self.peer) # Tap the transport: save every raw byte received from the device. raw_fh = open(raw_path, "wb") _orig_read = transport.read def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: raw_fh.write(data) raw_fh.flush() return data transport.read = tapped_read # type: ignore[method-assign] serial: Optional[str] = None try: client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() # ── Step 1: startup handshake ───────────────────────────────────── log.info("Step 1/3: startup handshake (POLL / SUB 5B)") try: from minimateplus.protocol import MiniMateProtocol proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() log.info(" [OK] Startup OK -- pull protocol confirmed") except Exception as exc: log.error(" [FAIL] Startup failed: %s", exc) return # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: log.info("Step 2/3: reading device info") try: device_info = client.connect() serial = device_info.serial _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " [OK] Device: serial=%s firmware=%s calibration=%s", serial, device_info.firmware_version, device_info.calibration_date, ) except Exception as exc: log.error(" [FAIL] Device info failed: %s", exc) else: log.info("Step 2/3: skipping device info (--events-only)") # ── Step 3: check for new events via high-water mark ─────────────── log.info("Step 3/3: checking for new events") state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial last_count = state.get(unit_key, {}).get("event_count", 0) try: current_count = client.count_events() log.info(" Unit has %d stored event(s); last downloaded count: %d", current_count, last_count) except Exception as exc: log.error(" [FAIL] count_events failed: %s", exc) return if current_count <= last_count: log.info(" [OK] No new events since last call-home -- nothing to download") log.info("Session complete (no new events) -> %s", session_dir) return new_event_count = current_count - last_count log.info(" %d new event(s) to download", new_event_count) # Download all events up to current_count, apply max_events cap. # We re-download old events too (get_events always starts from 0), # but we only SAVE the new ones (the last new_event_count of the list). stop_idx = current_count - 1 if self.max_events is not None: stop_idx = min(stop_idx, self.max_events - 1) if self.max_events < current_count: log.warning( " max_events=%d cap: will download events 0–%d only " "(unit has %d total)", self.max_events, stop_idx, current_count, ) try: all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, ) # Only the events beyond last_count are genuinely new new_events = all_events[last_count:] log.info(" [OK] Downloaded %d total event(s), %d new", len(all_events), len(new_events)) _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) if last_count > 0 and len(all_events) > len(new_events): log.info(" (skipped %d already-seen event(s))", last_count) for i, ev in enumerate(new_events): log.info( " NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f", last_count + i, ev.timestamp.isoformat() if ev.timestamp else "?", ev.peaks.transverse if ev.peaks else 0, ev.peaks.vertical if ev.peaks else 0, ev.peaks.longitudinal if ev.peaks else 0, ev.peaks.vector_sum if ev.peaks else 0, ) # Update high-water mark state[unit_key] = { "event_count": current_count, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) except Exception as exc: log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) finally: raw_fh.close() client.close() # closes transport / socket cleanly log.info("Session complete -> %s", session_dir) log.info("="*60) # ── JSON helpers ─────────────────────────────────────────────────────────────── def _save_json(path: Path, obj: object) -> None: with open(path, "w") as f: json.dump(obj, f, indent=2, default=str) log.debug("Saved %s", path) def _device_info_to_dict(d: DeviceInfo) -> dict: return { "serial": d.serial, "firmware_version": d.firmware_version, "calibration_date": str(d.calibration_date) if d.calibration_date else None, "aux_trigger": d.aux_trigger, "setup_name": d.setup_name, "sample_rate": d.sample_rate, "record_time": d.record_time, "trigger_level_geo": d.trigger_level_geo, "alarm_level_geo": d.alarm_level_geo, "max_range_geo": d.max_range_geo, "project": d.project, "client": d.client, "operator": d.operator, "sensor_location": d.sensor_location, } def _event_to_dict(e: Event) -> dict: peaks = {} if e.peaks: peaks = { "transverse": e.peaks.transverse, "vertical": e.peaks.vertical, "longitudinal": e.peaks.longitudinal, "vector_sum": e.peaks.vector_sum, "mic": e.peaks.mic, } samples = {} if e.raw_samples: samples = { ch: vals[:20] # first 20 sample-sets to keep the file sane for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" return { "timestamp": e.timestamp.isoformat() if e.timestamp else None, "project": e.project, "client": e.client, "operator": e.operator, "sensor_location": e.sensor_location, "peaks": peaks, "raw_samples_preview": samples, } # ── Main server loop ─────────────────────────────────────────────────────────── def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) max_ev = args.max_events print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f" State file: {state_path}") print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") try: while True: try: client_sock, addr = server_sock.accept() peer = f"{addr[0]}:{addr[1]}" log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, peer=peer, output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, max_events=max_ev, state_path=state_path, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() except KeyboardInterrupt: raise except Exception as exc: log.error("Accept error: %s", exc) finally: server_sock.close() print("\nServer stopped.") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--port", "-p", type=int, default=12345, help="Port to listen on (default: 12345).", ) p.add_argument( "--output", "-o", default=str(Path(__file__).parent / "captures"), metavar="DIR", help="Directory to write session captures (default: bridges/captures/).", ) p.add_argument( "--timeout", "-t", type=float, default=30.0, help="Protocol receive timeout in seconds (default: 30.0).", ) p.add_argument( "--events-only", action="store_true", help="Skip the device-info step and go straight to event download.", ) p.add_argument( "--max-events", type=int, default=None, metavar="N", help=( "Safety cap: download at most N events per session (default: unlimited). " "Useful if a unit has many old events stored — prevents a very long first run." ), ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: serve(args) except KeyboardInterrupt: print("\nStopped.")