#!/usr/bin/env python3 """ ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus call-home (ACH) traffic. Modes ----- standalone Accept connection, capture frames, do NOT forward anywhere. Good for initial discovery with a test unit. bridge Forward to one upstream server while capturing. Use this for the initial discovery phase with your test server. splitter Forward to the PRIMARY upstream (production ACH server) AND mirror a copy to a SECONDARY server simultaneously. The device never knows — it talks to the primary the whole time. If the mirror fails, the primary connection is unaffected. Think of it like a headphone splitter: one input, two outputs. Primary → authoritative responses back to device. Mirror → gets all device bytes, its responses are discarded. Usage ----- # Standalone capture (test/discovery — no forwarding) python bridges/ach_bridge.py --standalone [--port 12345] # Bridge mode (forward to one server, e.g. your test server) python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345] # Splitter mode (production: forward to prod + mirror to your server) python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345] Setup for discovery (test server, don't touch prod) ---------------------------------------------------- 1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345). 2. Take ONE test unit. In ACEmanager → Call Home, point it at: : <--port> 3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345 4. Trigger the unit. Raw frames are saved to bridges/captures/ach_/. 5. Revert the unit's ACEmanager setting when done. Setup for production splitter (when you're ready) ------------------------------------------------- This does NOT touch the units. Instead you re-route traffic at the network layer so that call-home packets arrive at a machine running this script first. Typical approach: update the DNS entry / host record your prod ACH server is registered under to point at this machine. The units keep their existing ACEmanager settings. python bridges/ach_bridge.py \\ --upstream PROD_ACH_HOST:12345 \\ --mirror MY_NEW_SERVER:12345 \\ --port 12345 Output (each connection gets its own timestamped sub-directory) ------ bridges/captures/ach_/ raw_client_.bin — raw bytes from the device (S3 side) raw_server_.bin — raw bytes from the primary upstream (BW side) raw_mirror_.bin — raw bytes from the mirror upstream (splitter mode only) session_.log — human-readable frame parse log session_.jsonl — JSON-lines frame log raw_client / raw_server are byte-for-byte compatible with parse_capture.py. """ from __future__ import annotations import argparse import asyncio import datetime import json import logging import os import sys from pathlib import Path from typing import List, Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.framing import S3FrameParser, S3Frame log = logging.getLogger("ach_bridge") # ── Frame label helpers ────────────────────────────────────────────────────── _KNOWN_RSP_SUBS = { 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP", 0xE1: "EVENT_INDEX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", 0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP", 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", # Write acks 0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK", 0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK", 0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK", 0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK", } _KNOWN_REQ_SUBS = { 0x5B: "POLL", 0x5A: "BULK_WAVEFORM", 0x1F: "ADVANCE_EVENT", 0x1E: "EVENT_INDEX_FIRST", 0x1C: "MONITOR_STATUS", 0x15: "SERIAL_NUM", 0x0C: "WAVEFORM_RECORD", 0x0A: "WAVEFORM_HEADER", 0x08: "EVENT_INDEX", 0x06: "UNK_06", 0x01: "DEVICE_INFO", # Write commands 0x68: "EVT_IDX_WRITE", 0x73: "CONFIRM_B", 0x71: "COMPLIANCE_WRITE", 0x72: "CONFIRM_A", 0x82: "TRIGGER_WRITE", 0x83: "TRIGGER_CONFIRM", 0x69: "WAVEFORM_WRITE", 0x74: "CONFIRM_C", 0x96: "START_MONITOR", 0x97: "STOP_MONITOR", } def _label_s3_frame(frame: S3Frame) -> str: name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") chk = "✓" if frame.checksum_valid else "✗CHK" return ( f"S3→ SUB=0x{frame.sub:02X} ({name}) " f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" ) def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str: """Best-effort label for a raw BW request frame (wire bytes).""" # Wire layout: 41 02 10 10 00 sub ... if len(data) < 6: return f"{prefix} (short {len(data)}B)" sub = data[5] name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}") return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B" # ── Per-session capture writer ───────────────────────────────────────────────── class CaptureSession: """Writes raw bytes + parsed log for one TCP connection.""" def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False): ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self.dir = capture_dir / f"ach_{ts}" self.dir.mkdir(parents=True, exist_ok=True) self.peer = peer self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb") self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb") self._raw_mirror = ( open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None ) self._log_fh = open(self.dir / f"session_{ts}.log", "w") self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") self._s3_parser = S3FrameParser() self._frame_count = 0 self._byte_count_client = 0 self._byte_count_server = 0 self._byte_count_mirror = 0 self._log( f"# ACH capture — peer={peer} " f"mirror={'yes' if has_mirror else 'no'} " f"started={datetime.datetime.now().isoformat()}" ) self._log(f"# Output dir: {self.dir}") log.info("Capture session opened: %s (peer=%s)", self.dir, peer) # ── public API ──────────────────────────────────────────────────────────── def feed_client(self, data: bytes) -> None: """Bytes FROM the device (S3 response frames).""" self._raw_client.write(data) self._raw_client.flush() self._byte_count_client += len(data) for byte in data: frame = self._s3_parser.feed(bytes([byte])) if frame: frames = frame if isinstance(frame, list) else [frame] for f in frames: self._frame_count += 1 label = _label_s3_frame(f) self._log(f"[{self._frame_count:04d}] {label}") self._log( f" hex: {f.data[:64].hex()}" + (" ..." if len(f.data) > 64 else "") ) self._emit_json("s3", f) def feed_server(self, data: bytes) -> None: """Bytes FROM the primary upstream server (BW request frames).""" self._raw_server.write(data) self._raw_server.flush() self._byte_count_server += len(data) label = _label_bw_frame(data, prefix=" →BW[primary]") self._log(f" {label}") def feed_mirror(self, data: bytes) -> None: """Bytes FROM the mirror server (logged, not forwarded to device).""" if self._raw_mirror: self._raw_mirror.write(data) self._raw_mirror.flush() self._byte_count_mirror += len(data) label = _label_bw_frame(data, prefix=" →BW[mirror] ") self._log(f" {label} [MIRROR — not sent to device]") def close(self, reason: str = "connection closed") -> None: self._log(f"# Session ended: {reason}") self._log( f"# Totals — client={self._byte_count_client}B " f"server={self._byte_count_server}B " f"mirror={self._byte_count_mirror}B " f"s3_frames={self._frame_count}" ) handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh] if self._raw_mirror: handles.append(self._raw_mirror) for fh in handles: try: fh.close() except Exception: pass log.info( "Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s", reason, self._byte_count_client, self._byte_count_server, self._byte_count_mirror, self._frame_count, self.dir, ) # ── internals ───────────────────────────────────────────────────────────── def _log(self, msg: str) -> None: print(msg, file=self._log_fh, flush=True) print(msg) def _emit_json(self, direction: str, frame: S3Frame) -> None: record = { "dir": direction, "sub": frame.sub, "page_key": frame.page_key, "data_len": len(frame.data), "data_hex": frame.data.hex(), "checksum_valid": frame.checksum_valid, } print(json.dumps(record), file=self._jsonl_fh, flush=True) # ── Bridge / splitter connection handler ────────────────────────────────────── class BridgeHandler: """ Handles inbound device connections. Modes (determined by which upstreams are configured): standalone — no upstream_host / no mirror_host bridge — upstream_host set, no mirror_host splitter — upstream_host AND mirror_host both set """ def __init__( self, capture_dir: Path, upstream_host: Optional[str], upstream_port: Optional[int], mirror_host: Optional[str] = None, mirror_port: Optional[int] = None, ): self.capture_dir = capture_dir self.upstream_host = upstream_host self.upstream_port = upstream_port self.mirror_host = mirror_host self.mirror_port = mirror_port async def handle( self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, ) -> None: peer = client_writer.get_extra_info("peername", ("?", 0)) peer_str = f"{peer[0]}:{peer[1]}" log.info("Inbound connection from %s", peer_str) has_mirror = bool(self.mirror_host) session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror) if not self.upstream_host: # ── Standalone mode ────────────────────────────────────────────── log.info("Standalone mode — recording inbound traffic only") try: while True: data = await client_reader.read(4096) if not data: break session.feed_client(data) except asyncio.CancelledError: pass except Exception as exc: log.warning("Standalone read error: %s", exc) finally: session.close("standalone capture ended") try: client_writer.close() await client_writer.wait_closed() except Exception: pass return # ── Bridge / splitter mode ─────────────────────────────────────────── # Connect to primary upstream (required) try: up_reader, up_writer = await asyncio.open_connection( self.upstream_host, self.upstream_port ) log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port) except Exception as exc: log.error("Failed to connect to primary upstream: %s", exc) session.close(f"primary connect failed: {exc}") client_writer.close() return # Connect to mirror upstream (optional — failure is non-fatal) mir_reader: Optional[asyncio.StreamReader] = None mir_writer: Optional[asyncio.StreamWriter] = None if self.mirror_host: try: mir_reader, mir_writer = await asyncio.open_connection( self.mirror_host, self.mirror_port ) log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port) except Exception as exc: log.warning( "Mirror connect failed — continuing without mirror: %s", exc ) session._log(f"# WARNING: mirror connect failed: {exc}") # Build relay tasks # # ┌──────────┐ device bytes ┌─────────────┐ # │ Device │ ─────────────► │ PRIMARY │ responses ──► device # └──────────┘ └─────────────┘ # │ # │ device bytes (copy) # ▼ # ┌─────────────┐ # │ MIRROR │ responses discarded (logged only) # └─────────────┘ # tasks = [ asyncio.create_task( self._relay_device(client_reader, up_writer, mir_writer, session), name="device→upstreams", ), asyncio.create_task( self._relay_simple(up_reader, client_writer, session, "server"), name="primary→device", ), ] if mir_reader is not None: tasks.append(asyncio.create_task( self._relay_drain(mir_reader, session), name="mirror→drain", )) try: # Wait for the device-to-upstreams relay to exit first (device # disconnected or primary dropped). Then cancel the rest. done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED, ) for t in pending: t.cancel() try: await t except (asyncio.CancelledError, Exception): pass except Exception as exc: log.warning("Bridge relay error: %s", exc) finally: session.close("relay ended") for writer in filter(None, [client_writer, up_writer, mir_writer]): try: writer.close() await writer.wait_closed() except Exception: pass # ── Relay helpers ───────────────────────────────────────────────────────── async def _relay_device( self, reader: asyncio.StreamReader, primary_writer: asyncio.StreamWriter, mirror_writer: Optional[asyncio.StreamWriter], session: CaptureSession, ) -> None: """ Read bytes from the device, write to the primary server, and also write a copy to the mirror server (if connected). Mirror write failures are non-fatal — we log and continue. """ try: while True: data = await reader.read(4096) if not data: break session.feed_client(data) # Primary write — failure IS fatal (lose primary = lose prod) primary_writer.write(data) await primary_writer.drain() # Mirror write — failure is non-fatal if mirror_writer is not None: try: mirror_writer.write(data) await mirror_writer.drain() except Exception as exc: log.warning("Mirror write failed (non-fatal): %s", exc) session._log(f"# WARNING: mirror write failed: {exc}") mirror_writer = None # stop trying except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass async def _relay_simple( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, session: CaptureSession, direction: str, ) -> None: """Standard single-pipe relay (primary→device or vice-versa).""" try: while True: data = await reader.read(4096) if not data: break if direction == "server": session.feed_server(data) else: session.feed_client(data) writer.write(data) await writer.drain() except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass async def _relay_drain( self, reader: asyncio.StreamReader, session: CaptureSession, ) -> None: """ Read mirror server responses, log them to session, do NOT forward to device. The device only ever sees primary server responses. """ try: while True: data = await reader.read(4096) if not data: break session.feed_mirror(data) except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass # ── Main ─────────────────────────────────────────────────────────────────────── async def main(args: argparse.Namespace) -> None: capture_dir = Path(__file__).parent / "captures" capture_dir.mkdir(parents=True, exist_ok=True) upstream_host: Optional[str] = None upstream_port: Optional[int] = None mirror_host: Optional[str] = None mirror_port: Optional[int] = None if not args.standalone: if not args.upstream: print("ERROR: --upstream HOST:PORT is required unless --standalone is set.") sys.exit(1) parts = args.upstream.rsplit(":", 1) if len(parts) != 2: print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)") sys.exit(1) upstream_host = parts[0] upstream_port = int(parts[1]) if args.mirror: parts = args.mirror.rsplit(":", 1) if len(parts) != 2: print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)") sys.exit(1) mirror_host = parts[0] mirror_port = int(parts[1]) handler = BridgeHandler( capture_dir, upstream_host, upstream_port, mirror_host, mirror_port, ) server = await asyncio.start_server( handler.handle, host="0.0.0.0", port=args.port, ) # ── Startup banner ──────────────────────────────────────────────────────── if args.standalone: mode = "STANDALONE capture (no forwarding)" elif mirror_host: mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}" else: mode = f"BRIDGE → {upstream_host}:{upstream_port}" addrs = ", ".join(str(s.getsockname()) for s in server.sockets) print(f"\n{'='*70}") print(f" ACH bridge/splitter listening on {addrs}") print(f" Mode: {mode}") print(f" Captures: {capture_dir}/ach_/") print(f"{'='*70}") if upstream_host and not mirror_host: print(f"\n DISCOVERY PHASE") print(f" Point your TEST unit's ACEmanager call-home destination to:") print(f" : {args.port}") print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}") elif mirror_host: print(f"\n SPLITTER MODE — PRODUCTION SAFE") print(f" Units connect as normal. Every byte is forwarded to:") print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}") print(f" MIRROR (your server): {mirror_host}:{mirror_port}") print(f" Only PRIMARY responses reach the device.") print(f" Mirror failures are logged and do not affect the device.") else: print(f"\n STANDALONE MODE — capture only, nothing forwarded") print(f" Point a unit at : {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") async with server: await server.serve_forever() def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description=( "Transparent TCP bridge / splitter for Instantel MiniMate Plus " "call-home (ACH) traffic." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--upstream", "-u", metavar="HOST:PORT", help=( "Primary upstream ACH server to forward to " "(e.g. 203.0.113.5:12345). " "Omit with --standalone for capture-only mode." ), ) p.add_argument( "--mirror", "-m", metavar="HOST:PORT", help=( "Mirror / secondary server to receive a copy of all device bytes " "(splitter mode). Mirror responses are logged but NOT forwarded " "to the device. Mirror failures are non-fatal." ), ) p.add_argument( "--port", "-p", type=int, default=12345, help="Local port to listen on (default: 12345).", ) p.add_argument( "--standalone", "-s", action="store_true", help="Capture-only mode: accept connection, do not forward anywhere.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: asyncio.run(main(args)) except KeyboardInterrupt: print("\nStopped.")