#!/usr/bin/env python3 """ ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. This IS your test server. Run it on any machine on the same network, point a unit's ACEmanager call-home destination at it, and it will speak the full BW protocol to the device: handshake, pull device info, download all events, save everything as JSON. The key thing this script tells you that no amount of packet sniffing can: - Does the device speak first (push) or wait for us to send POLL (pull)? If startup() completes normally → it's pull protocol, same as Blastware. If startup() times out → the device sent something first; check raw_rx.bin. Usage ----- python bridges/ach_server.py [--port 12345] [--output bridges/captures/] Setup ----- 1. Run this script on a machine on your local network. 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) find the Call Home / ACH settings. Set: Remote Host: Remote Port: 12345 3. Trigger the unit (wait for a vibration event, or use the manual call-home button if your firmware version has one). 4. The unit connects. This script handshakes, downloads all events, and saves a timestamped session directory. Output per session ------------------ bridges/captures/ach_inbound_/ device_info.json — serial number, firmware version, calibration date, etc. events.json — all events: timestamp, PPV per channel, peaks, metadata raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer session_.log — detailed protocol log What to look for ---------------- Push vs pull: Check session_.log. If the first line after "Connected" shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL gets a clean response, it's pull. Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry bulk waveform data — if frequency is sent pre-computed there will be float32 values before the ADC sample blocks. ACH-specific framing: Does the unit send anything extra before the DLE+STX framing starts? raw_rx.bin will show raw bytes including any preamble. """ from __future__ import annotations import argparse import datetime import json import logging import socket import sys import threading from pathlib import Path from typing import Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event log = logging.getLogger("ach_server") # ── Per-session handler ──────────────────────────────────────────────────────── class AchSession: """ Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. """ def __init__( self, sock: socket.socket, peer: str, output_dir: Path, timeout: float, events_only: bool, ) -> None: self.sock = sock self.peer = peer self.output_dir = output_dir self.timeout = timeout self.events_only = events_only def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") session_dir = self.output_dir / f"ach_inbound_{ts}" session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / f"session_{ts}.log" raw_path = session_dir / f"raw_rx_{ts}.bin" # Wire up a file handler so every protocol log line goes to the session log fh = logging.FileHandler(log_path) fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) root_logger = logging.getLogger() root_logger.addHandler(fh) try: self._run_inner(session_dir, raw_path, ts) except Exception as exc: log.error("Session failed: %s", exc, exc_info=True) finally: root_logger.removeHandler(fh) fh.close() try: self.sock.close() except Exception: pass def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: log.info("="*60) log.info("Inbound connection from %s", self.peer) log.info("Session dir: %s", session_dir) # Wrap the accepted socket in a SocketTransport. # SocketTransport.connect() is a no-op — the socket is already live. transport = SocketTransport(self.sock, peer=self.peer) # Tap the transport so we save every raw byte received from the device. # We monkey-patch read() to write to a file before returning. raw_fh = open(raw_path, "wb") original_read = transport.read def tapped_read(n: int) -> bytes: data = original_read(n) if data: raw_fh.write(data) raw_fh.flush() return data transport.read = tapped_read # type: ignore[method-assign] try: client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() # calls transport.connect() — no-op for SocketTransport # ── Step 1: startup handshake ───────────────────────────────────── log.info("Step 1/3: startup handshake (POLL / SUB 5B)") try: from minimateplus.protocol import MiniMateProtocol proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)") log.info(" NOTE: If you see this, the device waited for us to send POLL first.") log.info(" That means ACH is pull protocol (same as direct BW connection).") except Exception as exc: log.error(" ✗ Startup failed: %s", exc) log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.") log.warning(" If raw_rx.bin is empty → unit didn't respond at all.") return # ── Step 2: device info ─────────────────────────────────────────── if not self.events_only: log.info("Step 2/3: reading device info") try: device_info = client.connect() # SUB FE + 1A _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " ✓ Device: serial=%s firmware=%s calibration=%s", device_info.serial_number, device_info.firmware_version, device_info.calibration_date, ) except Exception as exc: log.error(" ✗ Device info failed: %s", exc) # Not fatal — continue to events else: log.info("Step 2/3: skipping device info (--events-only)") # ── Step 3: download events ──────────────────────────────────────── log.info("Step 3/3: downloading events") try: events = client.get_events(full_waveform=True) log.info(" ✓ Downloaded %d event(s)", len(events)) _save_json(session_dir / "events.json", [_event_to_dict(e) for e in events]) for i, ev in enumerate(events): log.info( " Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f", i, ev.timestamp.isoformat() if ev.timestamp else "?", ev.peaks.transverse if ev.peaks else 0, ev.peaks.vertical if ev.peaks else 0, ev.peaks.longitudinal if ev.peaks else 0, ev.peaks.vector_sum if ev.peaks else 0, ) except Exception as exc: log.error(" ✗ Event download failed: %s", exc) finally: raw_fh.close() client.close() log.info("Session complete → %s", session_dir) log.info("="*60) # ── JSON helpers ─────────────────────────────────────────────────────────────── def _save_json(path: Path, obj: object) -> None: with open(path, "w") as f: json.dump(obj, f, indent=2, default=str) log.debug("Saved %s", path) def _device_info_to_dict(d: DeviceInfo) -> dict: return { "serial_number": d.serial_number, "firmware_version": d.firmware_version, "calibration_date": str(d.calibration_date) if d.calibration_date else None, "aux_trigger": d.aux_trigger, "setup_name": d.setup_name, "sample_rate": d.sample_rate, "record_time": d.record_time, "trigger_level_geo": d.trigger_level_geo, "alarm_level_geo": d.alarm_level_geo, "max_range_geo": d.max_range_geo, "project": d.project, "client": d.client, "operator": d.operator, "sensor_location": d.sensor_location, } def _event_to_dict(e: Event) -> dict: peaks = {} if e.peaks: peaks = { "transverse": e.peaks.transverse, "vertical": e.peaks.vertical, "longitudinal": e.peaks.longitudinal, "vector_sum": e.peaks.vector_sum, "mic": e.peaks.mic, } samples = {} if e.raw_samples: samples = { ch: vals[:20] # first 20 sample-sets to keep the file sane for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" return { "timestamp": e.timestamp.isoformat() if e.timestamp else None, "project": e.project, "client": e.client, "operator": e.operator, "sensor_location": e.sensor_location, "peaks": peaks, "raw_samples_preview": samples, } # ── Main server loop ─────────────────────────────────────────────────────────── def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") try: while True: try: client_sock, addr = server_sock.accept() peer = f"{addr[0]}:{addr[1]}" log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, peer=peer, output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() except KeyboardInterrupt: raise except Exception as exc: log.error("Accept error: %s", exc) finally: server_sock.close() print("\nServer stopped.") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--port", "-p", type=int, default=12345, help="Port to listen on (default: 12345).", ) p.add_argument( "--output", "-o", default=str(Path(__file__).parent / "captures"), metavar="DIR", help="Directory to write session captures (default: bridges/captures/).", ) p.add_argument( "--timeout", "-t", type=float, default=30.0, help="Protocol receive timeout in seconds (default: 30.0).", ) p.add_argument( "--events-only", action="store_true", help="Skip the device-info step and go straight to event download.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: serve(args) except KeyboardInterrupt: print("\nStopped.")