From 05421764a59dd3e93e4e4fb923fe34f9b1b63c52 Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Thu, 9 Apr 2026 12:34:27 -0400 Subject: [PATCH] feat: add SocketTransport and ach_server.py inbound ACH server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit minimateplus/transport.py: - Add SocketTransport(TcpTransport) — wraps an already-accepted inbound socket; connect() is a no-op; everything else inherited from TcpTransport. Enables the ACH server to reuse all existing protocol/client code without any changes. bridges/ach_server.py: - Minimal inbound ACH server — listens on port 12345, accepts call-home connections from MiniMate Plus units, runs the full BW protocol: startup handshake → get_device_info → get_events(full_waveform=True) - Saves device_info.json + events.json + raw_rx_.bin + session log per connection to bridges/captures/ach_inbound_/ - raw_rx.bin is byte-compatible with existing Analyzer tooling - Taps transport.read() to capture raw S3 bytes alongside parsed output - Each connection runs in its own daemon thread - Clearly distinguishes push vs pull protocol in the startup log Co-Authored-By: Claude Sonnet 4.6 --- bridges/ach_server.py | 356 ++++++++++++++++++++++++++++++++++++++ minimateplus/transport.py | 36 ++++ 2 files changed, 392 insertions(+) create mode 100644 bridges/ach_server.py diff --git a/bridges/ach_server.py b/bridges/ach_server.py new file mode 100644 index 0000000..6088431 --- /dev/null +++ b/bridges/ach_server.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. + +This IS your test server. Run it on any machine on the same network, point a +unit's ACEmanager call-home destination at it, and it will speak the full BW +protocol to the device: handshake, pull device info, download all events, save +everything as JSON. + +The key thing this script tells you that no amount of packet sniffing can: + - Does the device speak first (push) or wait for us to send POLL (pull)? + +If startup() completes normally → it's pull protocol, same as Blastware. +If startup() times out → the device sent something first; check raw_rx.bin. + +Usage +----- + python bridges/ach_server.py [--port 12345] [--output bridges/captures/] + +Setup +----- + 1. Run this script on a machine on your local network. + 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) + find the Call Home / ACH settings. Set: + Remote Host: + Remote Port: 12345 + 3. Trigger the unit (wait for a vibration event, or use the manual call-home + button if your firmware version has one). + 4. The unit connects. This script handshakes, downloads all events, + and saves a timestamped session directory. + +Output per session +------------------ + bridges/captures/ach_inbound_/ + device_info.json — serial number, firmware version, calibration date, etc. + events.json — all events: timestamp, PPV per channel, peaks, metadata + raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer + session_.log — detailed protocol log + +What to look for +---------------- + Push vs pull: Check session_.log. If the first line after "Connected" + shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL + gets a clean response, it's pull. + + Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry + bulk waveform data — if frequency is sent pre-computed there will be float32 + values before the ADC sample blocks. + + ACH-specific framing: Does the unit send anything extra before the DLE+STX + framing starts? raw_rx.bin will show raw bytes including any preamble. +""" + +from __future__ import annotations + +import argparse +import datetime +import json +import logging +import socket +import sys +import threading +from pathlib import Path +from typing import Optional + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from minimateplus.transport import SocketTransport +from minimateplus.client import MiniMateClient +from minimateplus.models import DeviceInfo, Event + +log = logging.getLogger("ach_server") + + +# ── Per-session handler ──────────────────────────────────────────────────────── + +class AchSession: + """ + Handles one inbound unit connection in its own thread. + Wraps the socket in a SocketTransport → MiniMateClient, then runs the + standard connect → get_device_info → get_events sequence. + """ + + def __init__( + self, + sock: socket.socket, + peer: str, + output_dir: Path, + timeout: float, + events_only: bool, + ) -> None: + self.sock = sock + self.peer = peer + self.output_dir = output_dir + self.timeout = timeout + self.events_only = events_only + + def run(self) -> None: + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + session_dir = self.output_dir / f"ach_inbound_{ts}" + session_dir.mkdir(parents=True, exist_ok=True) + + log_path = session_dir / f"session_{ts}.log" + raw_path = session_dir / f"raw_rx_{ts}.bin" + + # Wire up a file handler so every protocol log line goes to the session log + fh = logging.FileHandler(log_path) + fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) + root_logger = logging.getLogger() + root_logger.addHandler(fh) + + try: + self._run_inner(session_dir, raw_path, ts) + except Exception as exc: + log.error("Session failed: %s", exc, exc_info=True) + finally: + root_logger.removeHandler(fh) + fh.close() + try: + self.sock.close() + except Exception: + pass + + def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: + log.info("="*60) + log.info("Inbound connection from %s", self.peer) + log.info("Session dir: %s", session_dir) + + # Wrap the accepted socket in a SocketTransport. + # SocketTransport.connect() is a no-op — the socket is already live. + transport = SocketTransport(self.sock, peer=self.peer) + + # Tap the transport so we save every raw byte received from the device. + # We monkey-patch read() to write to a file before returning. + raw_fh = open(raw_path, "wb") + original_read = transport.read + + def tapped_read(n: int) -> bytes: + data = original_read(n) + if data: + raw_fh.write(data) + raw_fh.flush() + return data + + transport.read = tapped_read # type: ignore[method-assign] + + try: + client = MiniMateClient(transport=transport, timeout=self.timeout) + client.open() # calls transport.connect() — no-op for SocketTransport + + # ── Step 1: startup handshake ───────────────────────────────────── + log.info("Step 1/3: startup handshake (POLL / SUB 5B)") + try: + from minimateplus.protocol import MiniMateProtocol + proto = MiniMateProtocol(transport, recv_timeout=self.timeout) + proto.startup() + log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)") + log.info(" NOTE: If you see this, the device waited for us to send POLL first.") + log.info(" That means ACH is pull protocol (same as direct BW connection).") + except Exception as exc: + log.error(" ✗ Startup failed: %s", exc) + log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.") + log.warning(" If raw_rx.bin is empty → unit didn't respond at all.") + return + + # ── Step 2: device info ─────────────────────────────────────────── + if not self.events_only: + log.info("Step 2/3: reading device info") + try: + device_info = client.connect() # SUB FE + 1A + _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) + log.info( + " ✓ Device: serial=%s firmware=%s calibration=%s", + device_info.serial_number, + device_info.firmware_version, + device_info.calibration_date, + ) + except Exception as exc: + log.error(" ✗ Device info failed: %s", exc) + # Not fatal — continue to events + else: + log.info("Step 2/3: skipping device info (--events-only)") + + # ── Step 3: download events ──────────────────────────────────────── + log.info("Step 3/3: downloading events") + try: + events = client.get_events(full_waveform=True) + log.info(" ✓ Downloaded %d event(s)", len(events)) + _save_json(session_dir / "events.json", [_event_to_dict(e) for e in events]) + for i, ev in enumerate(events): + log.info( + " Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f", + i, + ev.timestamp.isoformat() if ev.timestamp else "?", + ev.peaks.transverse if ev.peaks else 0, + ev.peaks.vertical if ev.peaks else 0, + ev.peaks.longitudinal if ev.peaks else 0, + ev.peaks.vector_sum if ev.peaks else 0, + ) + except Exception as exc: + log.error(" ✗ Event download failed: %s", exc) + + finally: + raw_fh.close() + client.close() + + log.info("Session complete → %s", session_dir) + log.info("="*60) + + +# ── JSON helpers ─────────────────────────────────────────────────────────────── + +def _save_json(path: Path, obj: object) -> None: + with open(path, "w") as f: + json.dump(obj, f, indent=2, default=str) + log.debug("Saved %s", path) + + +def _device_info_to_dict(d: DeviceInfo) -> dict: + return { + "serial_number": d.serial_number, + "firmware_version": d.firmware_version, + "calibration_date": str(d.calibration_date) if d.calibration_date else None, + "aux_trigger": d.aux_trigger, + "setup_name": d.setup_name, + "sample_rate": d.sample_rate, + "record_time": d.record_time, + "trigger_level_geo": d.trigger_level_geo, + "alarm_level_geo": d.alarm_level_geo, + "max_range_geo": d.max_range_geo, + "project": d.project, + "client": d.client, + "operator": d.operator, + "sensor_location": d.sensor_location, + } + + +def _event_to_dict(e: Event) -> dict: + peaks = {} + if e.peaks: + peaks = { + "transverse": e.peaks.transverse, + "vertical": e.peaks.vertical, + "longitudinal": e.peaks.longitudinal, + "vector_sum": e.peaks.vector_sum, + "mic": e.peaks.mic, + } + samples = {} + if e.raw_samples: + samples = { + ch: vals[:20] # first 20 sample-sets to keep the file sane + for ch, vals in e.raw_samples.items() + } + samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" + return { + "timestamp": e.timestamp.isoformat() if e.timestamp else None, + "project": e.project, + "client": e.client, + "operator": e.operator, + "sensor_location": e.sensor_location, + "peaks": peaks, + "raw_samples_preview": samples, + } + + +# ── Main server loop ─────────────────────────────────────────────────────────── + +def serve(args: argparse.Namespace) -> None: + output_dir = Path(args.output) + output_dir.mkdir(parents=True, exist_ok=True) + + server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_sock.bind(("0.0.0.0", args.port)) + server_sock.listen(5) + + print(f"\n{'='*60}") + print(f" ACH inbound server listening on 0.0.0.0:{args.port}") + print(f" Output: {output_dir.resolve()}/ach_inbound_/") + print(f"{'='*60}") + print(f"\n Point your test unit's ACEmanager call-home settings to:") + print(f" Remote Host: ") + print(f" Remote Port: {args.port}") + print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") + + try: + while True: + try: + client_sock, addr = server_sock.accept() + peer = f"{addr[0]}:{addr[1]}" + log.info("Accepted connection from %s", peer) + session = AchSession( + sock=client_sock, + peer=peer, + output_dir=output_dir, + timeout=args.timeout, + events_only=args.events_only, + ) + t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") + t.start() + except KeyboardInterrupt: + raise + except Exception as exc: + log.error("Accept error: %s", exc) + finally: + server_sock.close() + print("\nServer stopped.") + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + p.add_argument( + "--port", "-p", + type=int, + default=12345, + help="Port to listen on (default: 12345).", + ) + p.add_argument( + "--output", "-o", + default=str(Path(__file__).parent / "captures"), + metavar="DIR", + help="Directory to write session captures (default: bridges/captures/).", + ) + p.add_argument( + "--timeout", "-t", + type=float, + default=30.0, + help="Protocol receive timeout in seconds (default: 30.0).", + ) + p.add_argument( + "--events-only", + action="store_true", + help="Skip the device-info step and go straight to event download.", + ) + p.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable debug logging.", + ) + return p.parse_args() + + +if __name__ == "__main__": + args = parse_args() + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-7s %(name)s %(message)s", + ) + try: + serve(args) + except KeyboardInterrupt: + print("\nStopped.") diff --git a/minimateplus/transport.py b/minimateplus/transport.py index d0c37e6..65249d8 100644 --- a/minimateplus/transport.py +++ b/minimateplus/transport.py @@ -418,3 +418,39 @@ class TcpTransport(BaseTransport): def __repr__(self) -> str: state = "connected" if self.is_connected else "disconnected" return f"TcpTransport({self.host!r}, port={self.port}, {state})" + + +# ── Inbound / accepted-socket transport ─────────────────────────────────────── + +class SocketTransport(TcpTransport): + """ + Like TcpTransport but wraps an already-accepted inbound socket. + + Used by the ACH inbound server (bridges/ach_server.py) — the device dials + IN to us, so by the time we create this transport the socket is already live. + connect() is a no-op; everything else (read, write, read_until_idle, …) is + inherited unchanged from TcpTransport. + + Args: + sock: An already-connected socket.socket returned by server_socket.accept(). + peer: Human-readable peer label for repr / logging (e.g. "203.0.113.5:54321"). + """ + + def __init__(self, sock: socket.socket, peer: str = "inbound") -> None: + # Bypass TcpTransport.__init__ — we already have a live socket. + self.host = peer + self.port = 0 + self.connect_timeout = 0.0 + self._sock = sock + sock.settimeout(self._RECV_TIMEOUT) + + def connect(self) -> None: + """No-op — socket was already accepted inbound.""" + pass # Already have a live socket; nothing to open. + + @property + def is_connected(self) -> bool: + return self._sock is not None + + def __repr__(self) -> str: + return f"SocketTransport(peer={self.host!r})"