#!/usr/bin/env python3 """ ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus call-home (ACH) traffic. Usage ----- # Bridge mode: forward to real ACH server while logging python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034] # Standalone capture mode: accept connection, don't forward (use when you # want to see what the device sends/expects without a real server) python bridges/ach_bridge.py --standalone [--port 9034] Setup ----- 1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN → Call Home (or equivalent menu on your RV50/RV55 firmware). 2. Temporarily change that setting on ONE unit to point at: your-machine-local-ip : <--port> 3. Run this script. 4. Wait for the unit to trigger / call home. A capture file is written to bridges/captures/ach_/ alongside an auto-parsed frame log. 5. Revert the unit's ACEmanager setting. Output ------ bridges/captures/ach_/ raw_client_.bin — raw bytes from the device (S3 side) raw_server_.bin — raw bytes from the upstream server (BW side) (empty in standalone mode) session_.log — human-readable frame parse log session_.jsonl — JSON-lines frame log (for downstream tooling) The raw_client / raw_server files are byte-for-byte compatible with the existing capture format used by bridges/parse_capture.py and the rest of the analysis tooling. """ from __future__ import annotations import argparse import asyncio import datetime import json import logging import os import sys from pathlib import Path from typing import Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.framing import S3FrameParser, S3Frame log = logging.getLogger("ach_bridge") # ── Frame label helpers ────────────────────────────────────────────────────── _KNOWN_RSP_SUBS = { 0xA4: "POLL_RSP", 0xA5: "BULK_WAVEFORM_RSP", 0xE0: "ADVANCE_EVENT_RSP", 0xE1: "EVENT_INDEX_FIRST_RSP", 0xE3: "MONITOR_STATUS_RSP", 0xEA: "SERIAL_NUM_RSP", 0xF3: "WAVEFORM_RECORD_RSP", 0xF5: "WAVEFORM_HEADER_RSP", 0xF7: "EVENT_INDEX_RSP", 0xF9: "UNK_06_RSP", 0xFE: "DEVICE_INFO_RSP", # Write acks 0x97: "EVT_IDX_WRITE_ACK", 0x8C: "CONFIRM_B_ACK", 0x8E: "COMPLIANCE_WRITE_ACK", 0x8D: "CONFIRM_A_ACK", 0x7D: "TRIGGER_WRITE_ACK", 0x7C: "TRIGGER_CONFIRM_ACK", 0x96: "WAVEFORM_WRITE_ACK", 0x8B: "CONFIRM_C_ACK", 0x69: "START_MONITOR_ACK", 0x68: "STOP_MONITOR_ACK", } _KNOWN_REQ_SUBS = { 0x5B: "POLL", 0x5A: "BULK_WAVEFORM", 0x1F: "ADVANCE_EVENT", 0x1E: "EVENT_INDEX_FIRST", 0x1C: "MONITOR_STATUS", 0x15: "SERIAL_NUM", 0x0C: "WAVEFORM_RECORD", 0x0A: "WAVEFORM_HEADER", 0x08: "EVENT_INDEX", 0x06: "UNK_06", 0x01: "DEVICE_INFO", # Write commands 0x68: "EVT_IDX_WRITE", 0x73: "CONFIRM_B", 0x71: "COMPLIANCE_WRITE", 0x72: "CONFIRM_A", 0x82: "TRIGGER_WRITE", 0x83: "TRIGGER_CONFIRM", 0x69: "WAVEFORM_WRITE", 0x74: "CONFIRM_C", 0x96: "START_MONITOR", 0x97: "STOP_MONITOR", } def _label_s3_frame(frame: S3Frame) -> str: name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") chk = "✓" if frame.checksum_valid else "✗CHK" return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" def _label_bw_frame(data: bytes) -> str: """Best-effort label for a raw BW request frame.""" # BW frame (destuffed): ACK STX [10 10] flags sub ... # Wire: 41 02 10 10 00 sub ... if len(data) < 6: return f"BW→ (short {len(data)}B)" sub = data[5] name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}") return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B" # ── Per-session capture writer ──────────────────────────────────────────────── class CaptureSession: """Writes raw bytes + parsed log for one TCP connection.""" def __init__(self, capture_dir: Path, peer: str): ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self.dir = capture_dir / f"ach_{ts}" self.dir.mkdir(parents=True, exist_ok=True) self.peer = peer self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb") self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb") self._log_fh = open(self.dir / f"session_{ts}.log", "w") self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") self._s3_parser = S3FrameParser() self._frame_count = 0 self._byte_count_client = 0 self._byte_count_server = 0 self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}") self._log(f"# Output dir: {self.dir}") log.info("Capture session opened: %s (peer=%s)", self.dir, peer) # ── public API ─────────────────────────────────────────────────────────── def feed_client(self, data: bytes) -> None: """Bytes arriving FROM the device (S3 side).""" self._raw_client.write(data) self._raw_client.flush() self._byte_count_client += len(data) # Parse S3 frames for byte in data: frame = self._s3_parser.feed(bytes([byte])) if frame: frames = frame if isinstance(frame, list) else [frame] for f in frames: self._frame_count += 1 label = _label_s3_frame(f) self._log(f"[{self._frame_count:04d}] {label}") self._log(f" hex: {f.data[:64].hex()}" + (" ..." if len(f.data) > 64 else "")) self._emit_json("s3", f) def feed_server(self, data: bytes) -> None: """Bytes arriving FROM the upstream server (BW side).""" self._raw_server.write(data) self._raw_server.flush() self._byte_count_server += len(data) label = _label_bw_frame(data) self._log(f" {label}") def close(self, reason: str = "connection closed") -> None: self._log(f"# Session ended: {reason}") self._log(f"# Totals — client_bytes={self._byte_count_client} " f"server_bytes={self._byte_count_server} " f"s3_frames={self._frame_count}") for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh): try: fh.close() except Exception: pass log.info( "Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s", reason, self._byte_count_client, self._byte_count_server, self._frame_count, self.dir, ) # ── internals ──────────────────────────────────────────────────────────── def _log(self, msg: str) -> None: print(msg, file=self._log_fh, flush=True) # Also echo to console for live monitoring print(msg) def _emit_json(self, direction: str, frame: S3Frame) -> None: record = { "dir": direction, "sub": frame.sub, "page_key": frame.page_key, "data_len": len(frame.data), "data_hex": frame.data.hex(), "checksum_valid": frame.checksum_valid, } print(json.dumps(record), file=self._jsonl_fh, flush=True) # ── Bridge connection handler ───────────────────────────────────────────────── class BridgeHandler: def __init__( self, capture_dir: Path, upstream_host: Optional[str], upstream_port: Optional[int], ): self.capture_dir = capture_dir self.upstream_host = upstream_host self.upstream_port = upstream_port async def handle( self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, ) -> None: peer = client_writer.get_extra_info("peername", ("?", 0)) peer_str = f"{peer[0]}:{peer[1]}" log.info("Inbound connection from %s", peer_str) session = CaptureSession(self.capture_dir, peer_str) if self.upstream_host: # Bridge mode: connect to upstream and relay try: up_reader, up_writer = await asyncio.open_connection( self.upstream_host, self.upstream_port ) log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port) except Exception as exc: log.error("Failed to connect to upstream: %s", exc) session.close(f"upstream connect failed: {exc}") client_writer.close() return try: await asyncio.gather( self._relay(client_reader, up_writer, session, "client"), self._relay(up_reader, client_writer, session, "server"), ) except asyncio.CancelledError: pass except Exception as exc: log.warning("Bridge relay error: %s", exc) finally: session.close("bridge relay ended") for writer in (client_writer, up_writer): try: writer.close() await writer.wait_closed() except Exception: pass else: # Standalone mode: just capture, don't forward log.info("Standalone mode — recording inbound traffic only") try: while True: data = await client_reader.read(4096) if not data: break session.feed_client(data) except asyncio.CancelledError: pass except Exception as exc: log.warning("Standalone read error: %s", exc) finally: session.close("standalone capture ended") try: client_writer.close() await client_writer.wait_closed() except Exception: pass async def _relay( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, session: CaptureSession, direction: str, ) -> None: try: while True: data = await reader.read(4096) if not data: break if direction == "client": session.feed_client(data) else: session.feed_server(data) writer.write(data) await writer.drain() except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass # ── Main ────────────────────────────────────────────────────────────────────── async def main(args: argparse.Namespace) -> None: capture_dir = Path(__file__).parent / "captures" capture_dir.mkdir(parents=True, exist_ok=True) upstream_host: Optional[str] = None upstream_port: Optional[int] = None if not args.standalone: if not args.upstream: print("ERROR: --upstream HOST:PORT is required unless --standalone is set.") sys.exit(1) parts = args.upstream.rsplit(":", 1) if len(parts) != 2: print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)") sys.exit(1) upstream_host = parts[0] upstream_port = int(parts[1]) handler = BridgeHandler(capture_dir, upstream_host, upstream_port) server = await asyncio.start_server( handler.handle, host="0.0.0.0", port=args.port, ) mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture" addrs = ", ".join(str(s.getsockname()) for s in server.sockets) print(f"\n{'='*60}") print(f" ACH bridge listening on {addrs}") print(f" Mode: {mode}") print(f" Captures: {capture_dir}/ach_/") print(f"{'='*60}") print(f"\n Point your unit's ACEmanager call-home destination to:") print(f" :{args.port}") if upstream_host: print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}") print(f" Your live data feed is uninterrupted.") print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n") async with server: await server.serve_forever() def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic." ) p.add_argument( "--upstream", "-u", metavar="HOST:PORT", help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). " "Omit with --standalone for capture-only mode.", ) p.add_argument( "--port", "-p", type=int, default=9034, help="Local port to listen on (default: 9034).", ) p.add_argument( "--standalone", "-s", action="store_true", help="Capture-only mode: accept connection but do not forward to upstream.", ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: asyncio.run(main(args)) except KeyboardInterrupt: print("\nStopped.")