From 74233d7e31d27d0dd1bd48aa2699b487b8e3ae7d Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Thu, 9 Apr 2026 12:17:57 -0400 Subject: [PATCH] feat: add splitter mode to ach_bridge.py (--mirror HOST:PORT) Adds a production-safe headphone-splitter mode: - Device bytes tee'd to both --upstream (primary/prod) and --mirror (new server) - Only primary server responses are returned to the device - Mirror connect/write failures are non-fatal and logged; prod is unaffected - New raw_mirror_.bin capture file alongside raw_client/raw_server Three modes: standalone (capture only), bridge (one upstream), splitter (two). Default listen port changed to 12345 to match project ACH setup. Co-Authored-By: Claude Sonnet 4.6 --- bridges/ach_bridge.py | 472 +++++++++++++++++++++++++++++++----------- 1 file changed, 349 insertions(+), 123 deletions(-) diff --git a/bridges/ach_bridge.py b/bridges/ach_bridge.py index 1028f99..dbfa03d 100644 --- a/bridges/ach_bridge.py +++ b/bridges/ach_bridge.py @@ -1,40 +1,68 @@ #!/usr/bin/env python3 """ -ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus +ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus call-home (ACH) traffic. +Modes +----- + standalone Accept connection, capture frames, do NOT forward anywhere. + Good for initial discovery with a test unit. + + bridge Forward to one upstream server while capturing. + Use this for the initial discovery phase with your test server. + + splitter Forward to the PRIMARY upstream (production ACH server) AND + mirror a copy to a SECONDARY server simultaneously. + The device never knows — it talks to the primary the whole time. + If the mirror fails, the primary connection is unaffected. + + Think of it like a headphone splitter: one input, two outputs. + Primary → authoritative responses back to device. + Mirror → gets all device bytes, its responses are discarded. + Usage ----- - # Bridge mode: forward to real ACH server while logging - python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034] + # Standalone capture (test/discovery — no forwarding) + python bridges/ach_bridge.py --standalone [--port 12345] - # Standalone capture mode: accept connection, don't forward (use when you - # want to see what the device sends/expects without a real server) - python bridges/ach_bridge.py --standalone [--port 9034] + # Bridge mode (forward to one server, e.g. your test server) + python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345] -Setup ------ -1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN → - Call Home (or equivalent menu on your RV50/RV55 firmware). -2. Temporarily change that setting on ONE unit to point at: - your-machine-local-ip : <--port> -3. Run this script. -4. Wait for the unit to trigger / call home. A capture file is written to - bridges/captures/ach_/ alongside an auto-parsed frame log. -5. Revert the unit's ACEmanager setting. + # Splitter mode (production: forward to prod + mirror to your server) + python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345] -Output +Setup for discovery (test server, don't touch prod) +---------------------------------------------------- +1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345). +2. Take ONE test unit. In ACEmanager → Call Home, point it at: + : <--port> +3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345 +4. Trigger the unit. Raw frames are saved to bridges/captures/ach_/. +5. Revert the unit's ACEmanager setting when done. + +Setup for production splitter (when you're ready) +------------------------------------------------- +This does NOT touch the units. Instead you re-route traffic at the network +layer so that call-home packets arrive at a machine running this script first. +Typical approach: update the DNS entry / host record your prod ACH server is +registered under to point at this machine. The units keep their existing +ACEmanager settings. + + python bridges/ach_bridge.py \\ + --upstream PROD_ACH_HOST:12345 \\ + --mirror MY_NEW_SERVER:12345 \\ + --port 12345 + +Output (each connection gets its own timestamped sub-directory) ------ - bridges/captures/ach_/ + bridges/captures/ach_/ raw_client_.bin — raw bytes from the device (S3 side) - raw_server_.bin — raw bytes from the upstream server (BW side) - (empty in standalone mode) + raw_server_.bin — raw bytes from the primary upstream (BW side) + raw_mirror_.bin — raw bytes from the mirror upstream (splitter mode only) session_.log — human-readable frame parse log - session_.jsonl — JSON-lines frame log (for downstream tooling) + session_.jsonl — JSON-lines frame log -The raw_client / raw_server files are byte-for-byte compatible with the -existing capture format used by bridges/parse_capture.py and the rest of -the analysis tooling. +raw_client / raw_server are byte-for-byte compatible with parse_capture.py. """ from __future__ import annotations @@ -47,7 +75,7 @@ import logging import os import sys from pathlib import Path -from typing import Optional +from typing import List, Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) @@ -113,26 +141,28 @@ _KNOWN_REQ_SUBS = { def _label_s3_frame(frame: S3Frame) -> str: name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") chk = "✓" if frame.checksum_valid else "✗CHK" - return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" + return ( + f"S3→ SUB=0x{frame.sub:02X} ({name}) " + f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" + ) -def _label_bw_frame(data: bytes) -> str: - """Best-effort label for a raw BW request frame.""" - # BW frame (destuffed): ACK STX [10 10] flags sub ... - # Wire: 41 02 10 10 00 sub ... +def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str: + """Best-effort label for a raw BW request frame (wire bytes).""" + # Wire layout: 41 02 10 10 00 sub ... if len(data) < 6: - return f"BW→ (short {len(data)}B)" + return f"{prefix} (short {len(data)}B)" sub = data[5] name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}") - return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B" + return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B" -# ── Per-session capture writer ──────────────────────────────────────────────── +# ── Per-session capture writer ───────────────────────────────────────────────── class CaptureSession: """Writes raw bytes + parsed log for one TCP connection.""" - def __init__(self, capture_dir: Path, peer: str): + def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False): ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self.dir = capture_dir / f"ach_{ts}" self.dir.mkdir(parents=True, exist_ok=True) @@ -140,27 +170,34 @@ class CaptureSession: self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb") self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb") - self._log_fh = open(self.dir / f"session_{ts}.log", "w") - self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") + self._raw_mirror = ( + open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None + ) + self._log_fh = open(self.dir / f"session_{ts}.log", "w") + self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") self._s3_parser = S3FrameParser() - self._frame_count = 0 + self._frame_count = 0 self._byte_count_client = 0 self._byte_count_server = 0 + self._byte_count_mirror = 0 - self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}") + self._log( + f"# ACH capture — peer={peer} " + f"mirror={'yes' if has_mirror else 'no'} " + f"started={datetime.datetime.now().isoformat()}" + ) self._log(f"# Output dir: {self.dir}") log.info("Capture session opened: %s (peer=%s)", self.dir, peer) - # ── public API ─────────────────────────────────────────────────────────── + # ── public API ──────────────────────────────────────────────────────────── def feed_client(self, data: bytes) -> None: - """Bytes arriving FROM the device (S3 side).""" + """Bytes FROM the device (S3 response frames).""" self._raw_client.write(data) self._raw_client.flush() self._byte_count_client += len(data) - # Parse S3 frames for byte in data: frame = self._s3_parser.feed(bytes([byte])) if frame: @@ -169,65 +206,96 @@ class CaptureSession: self._frame_count += 1 label = _label_s3_frame(f) self._log(f"[{self._frame_count:04d}] {label}") - self._log(f" hex: {f.data[:64].hex()}" - + (" ..." if len(f.data) > 64 else "")) + self._log( + f" hex: {f.data[:64].hex()}" + + (" ..." if len(f.data) > 64 else "") + ) self._emit_json("s3", f) def feed_server(self, data: bytes) -> None: - """Bytes arriving FROM the upstream server (BW side).""" + """Bytes FROM the primary upstream server (BW request frames).""" self._raw_server.write(data) self._raw_server.flush() self._byte_count_server += len(data) - label = _label_bw_frame(data) + label = _label_bw_frame(data, prefix=" →BW[primary]") self._log(f" {label}") + def feed_mirror(self, data: bytes) -> None: + """Bytes FROM the mirror server (logged, not forwarded to device).""" + if self._raw_mirror: + self._raw_mirror.write(data) + self._raw_mirror.flush() + self._byte_count_mirror += len(data) + label = _label_bw_frame(data, prefix=" →BW[mirror] ") + self._log(f" {label} [MIRROR — not sent to device]") + def close(self, reason: str = "connection closed") -> None: self._log(f"# Session ended: {reason}") - self._log(f"# Totals — client_bytes={self._byte_count_client} " - f"server_bytes={self._byte_count_server} " - f"s3_frames={self._frame_count}") - for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh): + self._log( + f"# Totals — client={self._byte_count_client}B " + f"server={self._byte_count_server}B " + f"mirror={self._byte_count_mirror}B " + f"s3_frames={self._frame_count}" + ) + handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh] + if self._raw_mirror: + handles.append(self._raw_mirror) + for fh in handles: try: fh.close() except Exception: pass log.info( - "Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s", - reason, self._byte_count_client, self._byte_count_server, - self._frame_count, self.dir, + "Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s", + reason, + self._byte_count_client, self._byte_count_server, + self._byte_count_mirror, self._frame_count, + self.dir, ) - # ── internals ──────────────────────────────────────────────────────────── + # ── internals ───────────────────────────────────────────────────────────── def _log(self, msg: str) -> None: print(msg, file=self._log_fh, flush=True) - # Also echo to console for live monitoring print(msg) def _emit_json(self, direction: str, frame: S3Frame) -> None: record = { - "dir": direction, - "sub": frame.sub, - "page_key": frame.page_key, - "data_len": len(frame.data), - "data_hex": frame.data.hex(), + "dir": direction, + "sub": frame.sub, + "page_key": frame.page_key, + "data_len": len(frame.data), + "data_hex": frame.data.hex(), "checksum_valid": frame.checksum_valid, } print(json.dumps(record), file=self._jsonl_fh, flush=True) -# ── Bridge connection handler ───────────────────────────────────────────────── +# ── Bridge / splitter connection handler ────────────────────────────────────── class BridgeHandler: + """ + Handles inbound device connections. + + Modes (determined by which upstreams are configured): + standalone — no upstream_host / no mirror_host + bridge — upstream_host set, no mirror_host + splitter — upstream_host AND mirror_host both set + """ + def __init__( self, - capture_dir: Path, + capture_dir: Path, upstream_host: Optional[str], upstream_port: Optional[int], + mirror_host: Optional[str] = None, + mirror_port: Optional[int] = None, ): self.capture_dir = capture_dir self.upstream_host = upstream_host self.upstream_port = upstream_port + self.mirror_host = mirror_host + self.mirror_port = mirror_port async def handle( self, @@ -238,41 +306,11 @@ class BridgeHandler: peer_str = f"{peer[0]}:{peer[1]}" log.info("Inbound connection from %s", peer_str) - session = CaptureSession(self.capture_dir, peer_str) + has_mirror = bool(self.mirror_host) + session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror) - if self.upstream_host: - # Bridge mode: connect to upstream and relay - try: - up_reader, up_writer = await asyncio.open_connection( - self.upstream_host, self.upstream_port - ) - log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port) - except Exception as exc: - log.error("Failed to connect to upstream: %s", exc) - session.close(f"upstream connect failed: {exc}") - client_writer.close() - return - - try: - await asyncio.gather( - self._relay(client_reader, up_writer, session, "client"), - self._relay(up_reader, client_writer, session, "server"), - ) - except asyncio.CancelledError: - pass - except Exception as exc: - log.warning("Bridge relay error: %s", exc) - finally: - session.close("bridge relay ended") - for writer in (client_writer, up_writer): - try: - writer.close() - await writer.wait_closed() - except Exception: - pass - - else: - # Standalone mode: just capture, don't forward + if not self.upstream_host: + # ── Standalone mode ────────────────────────────────────────────── log.info("Standalone mode — recording inbound traffic only") try: while True: @@ -291,30 +329,168 @@ class BridgeHandler: await client_writer.wait_closed() except Exception: pass + return - async def _relay( + # ── Bridge / splitter mode ─────────────────────────────────────────── + # Connect to primary upstream (required) + try: + up_reader, up_writer = await asyncio.open_connection( + self.upstream_host, self.upstream_port + ) + log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port) + except Exception as exc: + log.error("Failed to connect to primary upstream: %s", exc) + session.close(f"primary connect failed: {exc}") + client_writer.close() + return + + # Connect to mirror upstream (optional — failure is non-fatal) + mir_reader: Optional[asyncio.StreamReader] = None + mir_writer: Optional[asyncio.StreamWriter] = None + if self.mirror_host: + try: + mir_reader, mir_writer = await asyncio.open_connection( + self.mirror_host, self.mirror_port + ) + log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port) + except Exception as exc: + log.warning( + "Mirror connect failed — continuing without mirror: %s", exc + ) + session._log(f"# WARNING: mirror connect failed: {exc}") + + # Build relay tasks + # + # ┌──────────┐ device bytes ┌─────────────┐ + # │ Device │ ─────────────► │ PRIMARY │ responses ──► device + # └──────────┘ └─────────────┘ + # │ + # │ device bytes (copy) + # ▼ + # ┌─────────────┐ + # │ MIRROR │ responses discarded (logged only) + # └─────────────┘ + # + tasks = [ + asyncio.create_task( + self._relay_device(client_reader, up_writer, mir_writer, session), + name="device→upstreams", + ), + asyncio.create_task( + self._relay_simple(up_reader, client_writer, session, "server"), + name="primary→device", + ), + ] + if mir_reader is not None: + tasks.append(asyncio.create_task( + self._relay_drain(mir_reader, session), + name="mirror→drain", + )) + + try: + # Wait for the device-to-upstreams relay to exit first (device + # disconnected or primary dropped). Then cancel the rest. + done, pending = await asyncio.wait( + tasks, + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + try: + await t + except (asyncio.CancelledError, Exception): + pass + except Exception as exc: + log.warning("Bridge relay error: %s", exc) + finally: + session.close("relay ended") + for writer in filter(None, [client_writer, up_writer, mir_writer]): + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + # ── Relay helpers ───────────────────────────────────────────────────────── + + async def _relay_device( self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - session: CaptureSession, - direction: str, + reader: asyncio.StreamReader, + primary_writer: asyncio.StreamWriter, + mirror_writer: Optional[asyncio.StreamWriter], + session: CaptureSession, ) -> None: + """ + Read bytes from the device, write to the primary server, and also + write a copy to the mirror server (if connected). Mirror write + failures are non-fatal — we log and continue. + """ try: while True: data = await reader.read(4096) if not data: break - if direction == "client": - session.feed_client(data) - else: + session.feed_client(data) + + # Primary write — failure IS fatal (lose primary = lose prod) + primary_writer.write(data) + await primary_writer.drain() + + # Mirror write — failure is non-fatal + if mirror_writer is not None: + try: + mirror_writer.write(data) + await mirror_writer.drain() + except Exception as exc: + log.warning("Mirror write failed (non-fatal): %s", exc) + session._log(f"# WARNING: mirror write failed: {exc}") + mirror_writer = None # stop trying + + except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): + pass + + async def _relay_simple( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + session: CaptureSession, + direction: str, + ) -> None: + """Standard single-pipe relay (primary→device or vice-versa).""" + try: + while True: + data = await reader.read(4096) + if not data: + break + if direction == "server": session.feed_server(data) + else: + session.feed_client(data) writer.write(data) await writer.drain() except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass + async def _relay_drain( + self, + reader: asyncio.StreamReader, + session: CaptureSession, + ) -> None: + """ + Read mirror server responses, log them to session, do NOT forward to + device. The device only ever sees primary server responses. + """ + try: + while True: + data = await reader.read(4096) + if not data: + break + session.feed_mirror(data) + except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): + pass -# ── Main ────────────────────────────────────────────────────────────────────── + +# ── Main ─────────────────────────────────────────────────────────────────────── async def main(args: argparse.Namespace) -> None: capture_dir = Path(__file__).parent / "captures" @@ -322,6 +498,8 @@ async def main(args: argparse.Namespace) -> None: upstream_host: Optional[str] = None upstream_port: Optional[int] = None + mirror_host: Optional[str] = None + mirror_port: Optional[int] = None if not args.standalone: if not args.upstream: @@ -329,12 +507,24 @@ async def main(args: argparse.Namespace) -> None: sys.exit(1) parts = args.upstream.rsplit(":", 1) if len(parts) != 2: - print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)") + print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)") sys.exit(1) upstream_host = parts[0] upstream_port = int(parts[1]) - handler = BridgeHandler(capture_dir, upstream_host, upstream_port) + if args.mirror: + parts = args.mirror.rsplit(":", 1) + if len(parts) != 2: + print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)") + sys.exit(1) + mirror_host = parts[0] + mirror_port = int(parts[1]) + + handler = BridgeHandler( + capture_dir, + upstream_host, upstream_port, + mirror_host, mirror_port, + ) server = await asyncio.start_server( handler.handle, @@ -342,19 +532,38 @@ async def main(args: argparse.Namespace) -> None: port=args.port, ) - mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture" + # ── Startup banner ──────────────────────────────────────────────────────── + if args.standalone: + mode = "STANDALONE capture (no forwarding)" + elif mirror_host: + mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}" + else: + mode = f"BRIDGE → {upstream_host}:{upstream_port}" + addrs = ", ".join(str(s.getsockname()) for s in server.sockets) - print(f"\n{'='*60}") - print(f" ACH bridge listening on {addrs}") - print(f" Mode: {mode}") + print(f"\n{'='*70}") + print(f" ACH bridge/splitter listening on {addrs}") + print(f" Mode: {mode}") print(f" Captures: {capture_dir}/ach_/") - print(f"{'='*60}") - print(f"\n Point your unit's ACEmanager call-home destination to:") - print(f" :{args.port}") - if upstream_host: - print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}") - print(f" Your live data feed is uninterrupted.") - print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n") + print(f"{'='*70}") + + if upstream_host and not mirror_host: + print(f"\n DISCOVERY PHASE") + print(f" Point your TEST unit's ACEmanager call-home destination to:") + print(f" : {args.port}") + print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}") + elif mirror_host: + print(f"\n SPLITTER MODE — PRODUCTION SAFE") + print(f" Units connect as normal. Every byte is forwarded to:") + print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}") + print(f" MIRROR (your server): {mirror_host}:{mirror_port}") + print(f" Only PRIMARY responses reach the device.") + print(f" Mirror failures are logged and do not affect the device.") + else: + print(f"\n STANDALONE MODE — capture only, nothing forwarded") + print(f" Point a unit at : {args.port}") + + print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") async with server: await server.serve_forever() @@ -362,24 +571,41 @@ async def main(args: argparse.Namespace) -> None: def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( - description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic." + description=( + "Transparent TCP bridge / splitter for Instantel MiniMate Plus " + "call-home (ACH) traffic." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, ) p.add_argument( "--upstream", "-u", metavar="HOST:PORT", - help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). " - "Omit with --standalone for capture-only mode.", + help=( + "Primary upstream ACH server to forward to " + "(e.g. 203.0.113.5:12345). " + "Omit with --standalone for capture-only mode." + ), + ) + p.add_argument( + "--mirror", "-m", + metavar="HOST:PORT", + help=( + "Mirror / secondary server to receive a copy of all device bytes " + "(splitter mode). Mirror responses are logged but NOT forwarded " + "to the device. Mirror failures are non-fatal." + ), ) p.add_argument( "--port", "-p", type=int, - default=9034, - help="Local port to listen on (default: 9034).", + default=12345, + help="Local port to listen on (default: 12345).", ) p.add_argument( "--standalone", "-s", action="store_true", - help="Capture-only mode: accept connection but do not forward to upstream.", + help="Capture-only mode: accept connection, do not forward anywhere.", ) p.add_argument( "--verbose", "-v",