feat: add SocketTransport and ach_server.py inbound ACH server

minimateplus/transport.py:
- Add SocketTransport(TcpTransport) — wraps an already-accepted inbound
  socket; connect() is a no-op; everything else inherited from TcpTransport.
  Enables the ACH server to reuse all existing protocol/client code without
  any changes.

bridges/ach_server.py:
- Minimal inbound ACH server — listens on port 12345, accepts call-home
  connections from MiniMate Plus units, runs the full BW protocol:
  startup handshake → get_device_info → get_events(full_waveform=True)
- Saves device_info.json + events.json + raw_rx_<ts>.bin + session log
  per connection to bridges/captures/ach_inbound_<ts>/
- raw_rx.bin is byte-compatible with existing Analyzer tooling
- Taps transport.read() to capture raw S3 bytes alongside parsed output
- Each connection runs in its own daemon thread
- Clearly distinguishes push vs pull protocol in the startup log

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 12:34:27 -04:00
committed by serversdown
parent 5e44cdc668
commit cf7d838bf4
2 changed files with 392 additions and 0 deletions
+356
View File
@@ -0,0 +1,356 @@
#!/usr/bin/env python3
"""
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
This IS your test server. Run it on any machine on the same network, point a
unit's ACEmanager call-home destination at it, and it will speak the full BW
protocol to the device: handshake, pull device info, download all events, save
everything as JSON.
The key thing this script tells you that no amount of packet sniffing can:
- Does the device speak first (push) or wait for us to send POLL (pull)?
If startup() completes normally → it's pull protocol, same as Blastware.
If startup() times out → the device sent something first; check raw_rx.bin.
Usage
-----
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
Setup
-----
1. Run this script on a machine on your local network.
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
find the Call Home / ACH settings. Set:
Remote Host: <this machine's LAN IP>
Remote Port: 12345
3. Trigger the unit (wait for a vibration event, or use the manual call-home
button if your firmware version has one).
4. The unit connects. This script handshakes, downloads all events,
and saves a timestamped session directory.
Output per session
------------------
bridges/captures/ach_inbound_<ts>/
device_info.json — serial number, firmware version, calibration date, etc.
events.json — all events: timestamp, PPV per channel, peaks, metadata
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
session_<ts>.log — detailed protocol log
What to look for
----------------
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
gets a clean response, it's pull.
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
bulk waveform data — if frequency is sent pre-computed there will be float32
values before the ADC sample blocks.
ACH-specific framing: Does the unit send anything extra before the DLE+STX
framing starts? raw_rx.bin will show raw bytes including any preamble.
"""
from __future__ import annotations
import argparse
import datetime
import json
import logging
import socket
import sys
import threading
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.transport import SocketTransport
from minimateplus.client import MiniMateClient
from minimateplus.models import DeviceInfo, Event
log = logging.getLogger("ach_server")
# ── Per-session handler ────────────────────────────────────────────────────────
class AchSession:
"""
Handles one inbound unit connection in its own thread.
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
standard connect → get_device_info → get_events sequence.
"""
def __init__(
self,
sock: socket.socket,
peer: str,
output_dir: Path,
timeout: float,
events_only: bool,
) -> None:
self.sock = sock
self.peer = peer
self.output_dir = output_dir
self.timeout = timeout
self.events_only = events_only
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_path = session_dir / f"raw_rx_{ts}.bin"
# Wire up a file handler so every protocol log line goes to the session log
fh = logging.FileHandler(log_path)
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
try:
self._run_inner(session_dir, raw_path, ts)
except Exception as exc:
log.error("Session failed: %s", exc, exc_info=True)
finally:
root_logger.removeHandler(fh)
fh.close()
try:
self.sock.close()
except Exception:
pass
def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None:
log.info("="*60)
log.info("Inbound connection from %s", self.peer)
log.info("Session dir: %s", session_dir)
# Wrap the accepted socket in a SocketTransport.
# SocketTransport.connect() is a no-op — the socket is already live.
transport = SocketTransport(self.sock, peer=self.peer)
# Tap the transport so we save every raw byte received from the device.
# We monkey-patch read() to write to a file before returning.
raw_fh = open(raw_path, "wb")
original_read = transport.read
def tapped_read(n: int) -> bytes:
data = original_read(n)
if data:
raw_fh.write(data)
raw_fh.flush()
return data
transport.read = tapped_read # type: ignore[method-assign]
try:
client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open() # calls transport.connect() — no-op for SocketTransport
# ── Step 1: startup handshake ─────────────────────────────────────
log.info("Step 1/3: startup handshake (POLL / SUB 5B)")
try:
from minimateplus.protocol import MiniMateProtocol
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup()
log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)")
log.info(" NOTE: If you see this, the device waited for us to send POLL first.")
log.info(" That means ACH is pull protocol (same as direct BW connection).")
except Exception as exc:
log.error(" ✗ Startup failed: %s", exc)
log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.")
log.warning(" If raw_rx.bin is empty → unit didn't respond at all.")
return
# ── Step 2: device info ───────────────────────────────────────────
if not self.events_only:
log.info("Step 2/3: reading device info")
try:
device_info = client.connect() # SUB FE + 1A
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
log.info(
" ✓ Device: serial=%s firmware=%s calibration=%s",
device_info.serial_number,
device_info.firmware_version,
device_info.calibration_date,
)
except Exception as exc:
log.error(" ✗ Device info failed: %s", exc)
# Not fatal — continue to events
else:
log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: download events ────────────────────────────────────────
log.info("Step 3/3: downloading events")
try:
events = client.get_events(full_waveform=True)
log.info(" ✓ Downloaded %d event(s)", len(events))
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in events])
for i, ev in enumerate(events):
log.info(
" Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
i,
ev.timestamp.isoformat() if ev.timestamp else "?",
ev.peaks.transverse if ev.peaks else 0,
ev.peaks.vertical if ev.peaks else 0,
ev.peaks.longitudinal if ev.peaks else 0,
ev.peaks.vector_sum if ev.peaks else 0,
)
except Exception as exc:
log.error(" ✗ Event download failed: %s", exc)
finally:
raw_fh.close()
client.close()
log.info("Session complete → %s", session_dir)
log.info("="*60)
# ── JSON helpers ───────────────────────────────────────────────────────────────
def _save_json(path: Path, obj: object) -> None:
with open(path, "w") as f:
json.dump(obj, f, indent=2, default=str)
log.debug("Saved %s", path)
def _device_info_to_dict(d: DeviceInfo) -> dict:
return {
"serial_number": d.serial_number,
"firmware_version": d.firmware_version,
"calibration_date": str(d.calibration_date) if d.calibration_date else None,
"aux_trigger": d.aux_trigger,
"setup_name": d.setup_name,
"sample_rate": d.sample_rate,
"record_time": d.record_time,
"trigger_level_geo": d.trigger_level_geo,
"alarm_level_geo": d.alarm_level_geo,
"max_range_geo": d.max_range_geo,
"project": d.project,
"client": d.client,
"operator": d.operator,
"sensor_location": d.sensor_location,
}
def _event_to_dict(e: Event) -> dict:
peaks = {}
if e.peaks:
peaks = {
"transverse": e.peaks.transverse,
"vertical": e.peaks.vertical,
"longitudinal": e.peaks.longitudinal,
"vector_sum": e.peaks.vector_sum,
"mic": e.peaks.mic,
}
samples = {}
if e.raw_samples:
samples = {
ch: vals[:20] # first 20 sample-sets to keep the file sane
for ch, vals in e.raw_samples.items()
}
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
return {
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
"project": e.project,
"client": e.client,
"operator": e.operator,
"sensor_location": e.sensor_location,
"peaks": peaks,
"raw_samples_preview": samples,
}
# ── Main server loop ───────────────────────────────────────────────────────────
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(("0.0.0.0", args.port))
server_sock.listen(5)
print(f"\n{'='*60}")
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
print(f" Remote Port: {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
try:
while True:
try:
client_sock, addr = server_sock.accept()
peer = f"{addr[0]}:{addr[1]}"
log.info("Accepted connection from %s", peer)
session = AchSession(
sock=client_sock,
peer=peer,
output_dir=output_dir,
timeout=args.timeout,
events_only=args.events_only,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
except KeyboardInterrupt:
raise
except Exception as exc:
log.error("Accept error: %s", exc)
finally:
server_sock.close()
print("\nServer stopped.")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
p.add_argument(
"--port", "-p",
type=int,
default=12345,
help="Port to listen on (default: 12345).",
)
p.add_argument(
"--output", "-o",
default=str(Path(__file__).parent / "captures"),
metavar="DIR",
help="Directory to write session captures (default: bridges/captures/).",
)
p.add_argument(
"--timeout", "-t",
type=float,
default=30.0,
help="Protocol receive timeout in seconds (default: 30.0).",
)
p.add_argument(
"--events-only",
action="store_true",
help="Skip the device-info step and go straight to event download.",
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
serve(args)
except KeyboardInterrupt:
print("\nStopped.")
+36
View File
@@ -418,3 +418,39 @@ class TcpTransport(BaseTransport):
def __repr__(self) -> str:
state = "connected" if self.is_connected else "disconnected"
return f"TcpTransport({self.host!r}, port={self.port}, {state})"
# ── Inbound / accepted-socket transport ───────────────────────────────────────
class SocketTransport(TcpTransport):
"""
Like TcpTransport but wraps an already-accepted inbound socket.
Used by the ACH inbound server (bridges/ach_server.py) — the device dials
IN to us, so by the time we create this transport the socket is already live.
connect() is a no-op; everything else (read, write, read_until_idle, …) is
inherited unchanged from TcpTransport.
Args:
sock: An already-connected socket.socket returned by server_socket.accept().
peer: Human-readable peer label for repr / logging (e.g. "203.0.113.5:54321").
"""
def __init__(self, sock: socket.socket, peer: str = "inbound") -> None:
# Bypass TcpTransport.__init__ — we already have a live socket.
self.host = peer
self.port = 0
self.connect_timeout = 0.0
self._sock = sock
sock.settimeout(self._RECV_TIMEOUT)
def connect(self) -> None:
"""No-op — socket was already accepted inbound."""
pass # Already have a live socket; nothing to open.
@property
def is_connected(self) -> bool:
return self._sock is not None
def __repr__(self) -> str:
return f"SocketTransport(peer={self.host!r})"