feat: add splitter mode to ach_bridge.py (--mirror HOST:PORT)

Adds a production-safe headphone-splitter mode:
- Device bytes tee'd to both --upstream (primary/prod) and --mirror (new server)
- Only primary server responses are returned to the device
- Mirror connect/write failures are non-fatal and logged; prod is unaffected
- New raw_mirror_<ts>.bin capture file alongside raw_client/raw_server

Three modes: standalone (capture only), bridge (one upstream), splitter (two).
Default listen port changed to 12345 to match project ACH setup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 12:17:57 -04:00
parent 46a86939b7
commit 74233d7e31
+349 -123
View File
@@ -1,40 +1,68 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus
call-home (ACH) traffic. call-home (ACH) traffic.
Modes
-----
standalone Accept connection, capture frames, do NOT forward anywhere.
Good for initial discovery with a test unit.
bridge Forward to one upstream server while capturing.
Use this for the initial discovery phase with your test server.
splitter Forward to the PRIMARY upstream (production ACH server) AND
mirror a copy to a SECONDARY server simultaneously.
The device never knows — it talks to the primary the whole time.
If the mirror fails, the primary connection is unaffected.
Think of it like a headphone splitter: one input, two outputs.
Primary → authoritative responses back to device.
Mirror → gets all device bytes, its responses are discarded.
Usage Usage
----- -----
# Bridge mode: forward to real ACH server while logging # Standalone capture (test/discovery — no forwarding)
python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034] python bridges/ach_bridge.py --standalone [--port 12345]
# Standalone capture mode: accept connection, don't forward (use when you # Bridge mode (forward to one server, e.g. your test server)
# want to see what the device sends/expects without a real server) python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345]
python bridges/ach_bridge.py --standalone [--port 9034]
Setup # Splitter mode (production: forward to prod + mirror to your server)
----- python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345]
1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN →
Call Home (or equivalent menu on your RV50/RV55 firmware).
2. Temporarily change that setting on ONE unit to point at:
your-machine-local-ip : <--port>
3. Run this script.
4. Wait for the unit to trigger / call home. A capture file is written to
bridges/captures/ach_<timestamp>/ alongside an auto-parsed frame log.
5. Revert the unit's ACEmanager setting.
Output Setup for discovery (test server, don't touch prod)
----------------------------------------------------
1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345).
2. Take ONE test unit. In ACEmanager → Call Home, point it at:
<this machine's LAN IP> : <--port>
3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345
4. Trigger the unit. Raw frames are saved to bridges/captures/ach_<ts>/.
5. Revert the unit's ACEmanager setting when done.
Setup for production splitter (when you're ready)
-------------------------------------------------
This does NOT touch the units. Instead you re-route traffic at the network
layer so that call-home packets arrive at a machine running this script first.
Typical approach: update the DNS entry / host record your prod ACH server is
registered under to point at this machine. The units keep their existing
ACEmanager settings.
python bridges/ach_bridge.py \\
--upstream PROD_ACH_HOST:12345 \\
--mirror MY_NEW_SERVER:12345 \\
--port 12345
Output (each connection gets its own timestamped sub-directory)
------ ------
bridges/captures/ach_<ISO-timestamp>/ bridges/captures/ach_<ts>/
raw_client_<ts>.bin — raw bytes from the device (S3 side) raw_client_<ts>.bin — raw bytes from the device (S3 side)
raw_server_<ts>.bin — raw bytes from the upstream server (BW side) raw_server_<ts>.bin — raw bytes from the primary upstream (BW side)
(empty in standalone mode) raw_mirror_<ts>.bin — raw bytes from the mirror upstream (splitter mode only)
session_<ts>.log — human-readable frame parse log session_<ts>.log — human-readable frame parse log
session_<ts>.jsonl — JSON-lines frame log (for downstream tooling) session_<ts>.jsonl — JSON-lines frame log
The raw_client / raw_server files are byte-for-byte compatible with the raw_client / raw_server are byte-for-byte compatible with parse_capture.py.
existing capture format used by bridges/parse_capture.py and the rest of
the analysis tooling.
""" """
from __future__ import annotations from __future__ import annotations
@@ -47,7 +75,7 @@ import logging
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional from typing import List, Optional
# Add project root to path # Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
@@ -113,26 +141,28 @@ _KNOWN_REQ_SUBS = {
def _label_s3_frame(frame: S3Frame) -> str: def _label_s3_frame(frame: S3Frame) -> str:
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}") name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
chk = "" if frame.checksum_valid else "✗CHK" chk = "" if frame.checksum_valid else "✗CHK"
return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}" return (
f"S3→ SUB=0x{frame.sub:02X} ({name}) "
f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
)
def _label_bw_frame(data: bytes) -> str: def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str:
"""Best-effort label for a raw BW request frame.""" """Best-effort label for a raw BW request frame (wire bytes)."""
# BW frame (destuffed): ACK STX [10 10] flags sub ... # Wire layout: 41 02 10 10 00 sub ...
# Wire: 41 02 10 10 00 sub ...
if len(data) < 6: if len(data) < 6:
return f"BW→ (short {len(data)}B)" return f"{prefix} (short {len(data)}B)"
sub = data[5] sub = data[5]
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}") name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B" return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B"
# ── Per-session capture writer ──────────────────────────────────────────────── # ── Per-session capture writer ────────────────────────────────────────────────
class CaptureSession: class CaptureSession:
"""Writes raw bytes + parsed log for one TCP connection.""" """Writes raw bytes + parsed log for one TCP connection."""
def __init__(self, capture_dir: Path, peer: str): def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False):
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.dir = capture_dir / f"ach_{ts}" self.dir = capture_dir / f"ach_{ts}"
self.dir.mkdir(parents=True, exist_ok=True) self.dir.mkdir(parents=True, exist_ok=True)
@@ -140,27 +170,34 @@ class CaptureSession:
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb") self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb") self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
self._log_fh = open(self.dir / f"session_{ts}.log", "w") self._raw_mirror = (
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w") open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None
)
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
self._s3_parser = S3FrameParser() self._s3_parser = S3FrameParser()
self._frame_count = 0 self._frame_count = 0
self._byte_count_client = 0 self._byte_count_client = 0
self._byte_count_server = 0 self._byte_count_server = 0
self._byte_count_mirror = 0
self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}") self._log(
f"# ACH capture — peer={peer} "
f"mirror={'yes' if has_mirror else 'no'} "
f"started={datetime.datetime.now().isoformat()}"
)
self._log(f"# Output dir: {self.dir}") self._log(f"# Output dir: {self.dir}")
log.info("Capture session opened: %s (peer=%s)", self.dir, peer) log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
# ── public API ─────────────────────────────────────────────────────────── # ── public API ───────────────────────────────────────────────────────────
def feed_client(self, data: bytes) -> None: def feed_client(self, data: bytes) -> None:
"""Bytes arriving FROM the device (S3 side).""" """Bytes FROM the device (S3 response frames)."""
self._raw_client.write(data) self._raw_client.write(data)
self._raw_client.flush() self._raw_client.flush()
self._byte_count_client += len(data) self._byte_count_client += len(data)
# Parse S3 frames
for byte in data: for byte in data:
frame = self._s3_parser.feed(bytes([byte])) frame = self._s3_parser.feed(bytes([byte]))
if frame: if frame:
@@ -169,65 +206,96 @@ class CaptureSession:
self._frame_count += 1 self._frame_count += 1
label = _label_s3_frame(f) label = _label_s3_frame(f)
self._log(f"[{self._frame_count:04d}] {label}") self._log(f"[{self._frame_count:04d}] {label}")
self._log(f" hex: {f.data[:64].hex()}" self._log(
+ (" ..." if len(f.data) > 64 else "")) f" hex: {f.data[:64].hex()}"
+ (" ..." if len(f.data) > 64 else "")
)
self._emit_json("s3", f) self._emit_json("s3", f)
def feed_server(self, data: bytes) -> None: def feed_server(self, data: bytes) -> None:
"""Bytes arriving FROM the upstream server (BW side).""" """Bytes FROM the primary upstream server (BW request frames)."""
self._raw_server.write(data) self._raw_server.write(data)
self._raw_server.flush() self._raw_server.flush()
self._byte_count_server += len(data) self._byte_count_server += len(data)
label = _label_bw_frame(data) label = _label_bw_frame(data, prefix=" →BW[primary]")
self._log(f" {label}") self._log(f" {label}")
def feed_mirror(self, data: bytes) -> None:
"""Bytes FROM the mirror server (logged, not forwarded to device)."""
if self._raw_mirror:
self._raw_mirror.write(data)
self._raw_mirror.flush()
self._byte_count_mirror += len(data)
label = _label_bw_frame(data, prefix=" →BW[mirror] ")
self._log(f" {label} [MIRROR — not sent to device]")
def close(self, reason: str = "connection closed") -> None: def close(self, reason: str = "connection closed") -> None:
self._log(f"# Session ended: {reason}") self._log(f"# Session ended: {reason}")
self._log(f"# Totals — client_bytes={self._byte_count_client} " self._log(
f"server_bytes={self._byte_count_server} " f"# Totals — client={self._byte_count_client}B "
f"s3_frames={self._frame_count}") f"server={self._byte_count_server}B "
for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh): f"mirror={self._byte_count_mirror}B "
f"s3_frames={self._frame_count}"
)
handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh]
if self._raw_mirror:
handles.append(self._raw_mirror)
for fh in handles:
try: try:
fh.close() fh.close()
except Exception: except Exception:
pass pass
log.info( log.info(
"Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s", "Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s",
reason, self._byte_count_client, self._byte_count_server, reason,
self._frame_count, self.dir, self._byte_count_client, self._byte_count_server,
self._byte_count_mirror, self._frame_count,
self.dir,
) )
# ── internals ──────────────────────────────────────────────────────────── # ── internals ────────────────────────────────────────────────────────────
def _log(self, msg: str) -> None: def _log(self, msg: str) -> None:
print(msg, file=self._log_fh, flush=True) print(msg, file=self._log_fh, flush=True)
# Also echo to console for live monitoring
print(msg) print(msg)
def _emit_json(self, direction: str, frame: S3Frame) -> None: def _emit_json(self, direction: str, frame: S3Frame) -> None:
record = { record = {
"dir": direction, "dir": direction,
"sub": frame.sub, "sub": frame.sub,
"page_key": frame.page_key, "page_key": frame.page_key,
"data_len": len(frame.data), "data_len": len(frame.data),
"data_hex": frame.data.hex(), "data_hex": frame.data.hex(),
"checksum_valid": frame.checksum_valid, "checksum_valid": frame.checksum_valid,
} }
print(json.dumps(record), file=self._jsonl_fh, flush=True) print(json.dumps(record), file=self._jsonl_fh, flush=True)
# ── Bridge connection handler ───────────────────────────────────────────────── # ── Bridge / splitter connection handler ──────────────────────────────────────
class BridgeHandler: class BridgeHandler:
"""
Handles inbound device connections.
Modes (determined by which upstreams are configured):
standalone — no upstream_host / no mirror_host
bridge — upstream_host set, no mirror_host
splitter — upstream_host AND mirror_host both set
"""
def __init__( def __init__(
self, self,
capture_dir: Path, capture_dir: Path,
upstream_host: Optional[str], upstream_host: Optional[str],
upstream_port: Optional[int], upstream_port: Optional[int],
mirror_host: Optional[str] = None,
mirror_port: Optional[int] = None,
): ):
self.capture_dir = capture_dir self.capture_dir = capture_dir
self.upstream_host = upstream_host self.upstream_host = upstream_host
self.upstream_port = upstream_port self.upstream_port = upstream_port
self.mirror_host = mirror_host
self.mirror_port = mirror_port
async def handle( async def handle(
self, self,
@@ -238,41 +306,11 @@ class BridgeHandler:
peer_str = f"{peer[0]}:{peer[1]}" peer_str = f"{peer[0]}:{peer[1]}"
log.info("Inbound connection from %s", peer_str) log.info("Inbound connection from %s", peer_str)
session = CaptureSession(self.capture_dir, peer_str) has_mirror = bool(self.mirror_host)
session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror)
if self.upstream_host: if not self.upstream_host:
# Bridge mode: connect to upstream and relay # ── Standalone mode ──────────────────────────────────────────────
try:
up_reader, up_writer = await asyncio.open_connection(
self.upstream_host, self.upstream_port
)
log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port)
except Exception as exc:
log.error("Failed to connect to upstream: %s", exc)
session.close(f"upstream connect failed: {exc}")
client_writer.close()
return
try:
await asyncio.gather(
self._relay(client_reader, up_writer, session, "client"),
self._relay(up_reader, client_writer, session, "server"),
)
except asyncio.CancelledError:
pass
except Exception as exc:
log.warning("Bridge relay error: %s", exc)
finally:
session.close("bridge relay ended")
for writer in (client_writer, up_writer):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
else:
# Standalone mode: just capture, don't forward
log.info("Standalone mode — recording inbound traffic only") log.info("Standalone mode — recording inbound traffic only")
try: try:
while True: while True:
@@ -291,30 +329,168 @@ class BridgeHandler:
await client_writer.wait_closed() await client_writer.wait_closed()
except Exception: except Exception:
pass pass
return
async def _relay( # ── Bridge / splitter mode ───────────────────────────────────────────
# Connect to primary upstream (required)
try:
up_reader, up_writer = await asyncio.open_connection(
self.upstream_host, self.upstream_port
)
log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port)
except Exception as exc:
log.error("Failed to connect to primary upstream: %s", exc)
session.close(f"primary connect failed: {exc}")
client_writer.close()
return
# Connect to mirror upstream (optional — failure is non-fatal)
mir_reader: Optional[asyncio.StreamReader] = None
mir_writer: Optional[asyncio.StreamWriter] = None
if self.mirror_host:
try:
mir_reader, mir_writer = await asyncio.open_connection(
self.mirror_host, self.mirror_port
)
log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port)
except Exception as exc:
log.warning(
"Mirror connect failed — continuing without mirror: %s", exc
)
session._log(f"# WARNING: mirror connect failed: {exc}")
# Build relay tasks
#
# ┌──────────┐ device bytes ┌─────────────┐
# │ Device │ ─────────────► │ PRIMARY │ responses ──► device
# └──────────┘ └─────────────┘
# │
# │ device bytes (copy)
# ▼
# ┌─────────────┐
# │ MIRROR │ responses discarded (logged only)
# └─────────────┘
#
tasks = [
asyncio.create_task(
self._relay_device(client_reader, up_writer, mir_writer, session),
name="device→upstreams",
),
asyncio.create_task(
self._relay_simple(up_reader, client_writer, session, "server"),
name="primary→device",
),
]
if mir_reader is not None:
tasks.append(asyncio.create_task(
self._relay_drain(mir_reader, session),
name="mirror→drain",
))
try:
# Wait for the device-to-upstreams relay to exit first (device
# disconnected or primary dropped). Then cancel the rest.
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
except Exception as exc:
log.warning("Bridge relay error: %s", exc)
finally:
session.close("relay ended")
for writer in filter(None, [client_writer, up_writer, mir_writer]):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
# ── Relay helpers ─────────────────────────────────────────────────────────
async def _relay_device(
self, self,
reader: asyncio.StreamReader, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter, primary_writer: asyncio.StreamWriter,
session: CaptureSession, mirror_writer: Optional[asyncio.StreamWriter],
direction: str, session: CaptureSession,
) -> None: ) -> None:
"""
Read bytes from the device, write to the primary server, and also
write a copy to the mirror server (if connected). Mirror write
failures are non-fatal — we log and continue.
"""
try: try:
while True: while True:
data = await reader.read(4096) data = await reader.read(4096)
if not data: if not data:
break break
if direction == "client": session.feed_client(data)
session.feed_client(data)
else: # Primary write — failure IS fatal (lose primary = lose prod)
primary_writer.write(data)
await primary_writer.drain()
# Mirror write — failure is non-fatal
if mirror_writer is not None:
try:
mirror_writer.write(data)
await mirror_writer.drain()
except Exception as exc:
log.warning("Mirror write failed (non-fatal): %s", exc)
session._log(f"# WARNING: mirror write failed: {exc}")
mirror_writer = None # stop trying
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
async def _relay_simple(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
session: CaptureSession,
direction: str,
) -> None:
"""Standard single-pipe relay (primary→device or vice-versa)."""
try:
while True:
data = await reader.read(4096)
if not data:
break
if direction == "server":
session.feed_server(data) session.feed_server(data)
else:
session.feed_client(data)
writer.write(data) writer.write(data)
await writer.drain() await writer.drain()
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass pass
async def _relay_drain(
self,
reader: asyncio.StreamReader,
session: CaptureSession,
) -> None:
"""
Read mirror server responses, log them to session, do NOT forward to
device. The device only ever sees primary server responses.
"""
try:
while True:
data = await reader.read(4096)
if not data:
break
session.feed_mirror(data)
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
pass
# ── Main ──────────────────────────────────────────────────────────────────────
# ── Main ───────────────────────────────────────────────────────────────────────
async def main(args: argparse.Namespace) -> None: async def main(args: argparse.Namespace) -> None:
capture_dir = Path(__file__).parent / "captures" capture_dir = Path(__file__).parent / "captures"
@@ -322,6 +498,8 @@ async def main(args: argparse.Namespace) -> None:
upstream_host: Optional[str] = None upstream_host: Optional[str] = None
upstream_port: Optional[int] = None upstream_port: Optional[int] = None
mirror_host: Optional[str] = None
mirror_port: Optional[int] = None
if not args.standalone: if not args.standalone:
if not args.upstream: if not args.upstream:
@@ -329,12 +507,24 @@ async def main(args: argparse.Namespace) -> None:
sys.exit(1) sys.exit(1)
parts = args.upstream.rsplit(":", 1) parts = args.upstream.rsplit(":", 1)
if len(parts) != 2: if len(parts) != 2:
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)") print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)")
sys.exit(1) sys.exit(1)
upstream_host = parts[0] upstream_host = parts[0]
upstream_port = int(parts[1]) upstream_port = int(parts[1])
handler = BridgeHandler(capture_dir, upstream_host, upstream_port) if args.mirror:
parts = args.mirror.rsplit(":", 1)
if len(parts) != 2:
print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)")
sys.exit(1)
mirror_host = parts[0]
mirror_port = int(parts[1])
handler = BridgeHandler(
capture_dir,
upstream_host, upstream_port,
mirror_host, mirror_port,
)
server = await asyncio.start_server( server = await asyncio.start_server(
handler.handle, handler.handle,
@@ -342,19 +532,38 @@ async def main(args: argparse.Namespace) -> None:
port=args.port, port=args.port,
) )
mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture" # ── Startup banner ────────────────────────────────────────────────────────
if args.standalone:
mode = "STANDALONE capture (no forwarding)"
elif mirror_host:
mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}"
else:
mode = f"BRIDGE → {upstream_host}:{upstream_port}"
addrs = ", ".join(str(s.getsockname()) for s in server.sockets) addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
print(f"\n{'='*60}") print(f"\n{'='*70}")
print(f" ACH bridge listening on {addrs}") print(f" ACH bridge/splitter listening on {addrs}")
print(f" Mode: {mode}") print(f" Mode: {mode}")
print(f" Captures: {capture_dir}/ach_<timestamp>/") print(f" Captures: {capture_dir}/ach_<timestamp>/")
print(f"{'='*60}") print(f"{'='*70}")
print(f"\n Point your unit's ACEmanager call-home destination to:")
print(f" <this machine's LAN IP>:{args.port}") if upstream_host and not mirror_host:
if upstream_host: print(f"\n DISCOVERY PHASE")
print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}") print(f" Point your TEST unit's ACEmanager call-home destination to:")
print(f" Your live data feed is uninterrupted.") print(f" <this machine's LAN IP> : {args.port}")
print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n") print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}")
elif mirror_host:
print(f"\n SPLITTER MODE — PRODUCTION SAFE")
print(f" Units connect as normal. Every byte is forwarded to:")
print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}")
print(f" MIRROR (your server): {mirror_host}:{mirror_port}")
print(f" Only PRIMARY responses reach the device.")
print(f" Mirror failures are logged and do not affect the device.")
else:
print(f"\n STANDALONE MODE — capture only, nothing forwarded")
print(f" Point a unit at <this machine's LAN IP> : {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
async with server: async with server:
await server.serve_forever() await server.serve_forever()
@@ -362,24 +571,41 @@ async def main(args: argparse.Namespace) -> None:
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser( p = argparse.ArgumentParser(
description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic." description=(
"Transparent TCP bridge / splitter for Instantel MiniMate Plus "
"call-home (ACH) traffic."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
) )
p.add_argument( p.add_argument(
"--upstream", "-u", "--upstream", "-u",
metavar="HOST:PORT", metavar="HOST:PORT",
help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). " help=(
"Omit with --standalone for capture-only mode.", "Primary upstream ACH server to forward to "
"(e.g. 203.0.113.5:12345). "
"Omit with --standalone for capture-only mode."
),
)
p.add_argument(
"--mirror", "-m",
metavar="HOST:PORT",
help=(
"Mirror / secondary server to receive a copy of all device bytes "
"(splitter mode). Mirror responses are logged but NOT forwarded "
"to the device. Mirror failures are non-fatal."
),
) )
p.add_argument( p.add_argument(
"--port", "-p", "--port", "-p",
type=int, type=int,
default=9034, default=12345,
help="Local port to listen on (default: 9034).", help="Local port to listen on (default: 12345).",
) )
p.add_argument( p.add_argument(
"--standalone", "-s", "--standalone", "-s",
action="store_true", action="store_true",
help="Capture-only mode: accept connection but do not forward to upstream.", help="Capture-only mode: accept connection, do not forward anywhere.",
) )
p.add_argument( p.add_argument(
"--verbose", "-v", "--verbose", "-v",