feat: add splitter mode to ach_bridge.py (--mirror HOST:PORT)
Adds a production-safe headphone-splitter mode: - Device bytes tee'd to both --upstream (primary/prod) and --mirror (new server) - Only primary server responses are returned to the device - Mirror connect/write failures are non-fatal and logged; prod is unaffected - New raw_mirror_<ts>.bin capture file alongside raw_client/raw_server Three modes: standalone (capture only), bridge (one upstream), splitter (two). Default listen port changed to 12345 to match project ACH setup. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+335
-109
@@ -1,40 +1,68 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
ach_bridge.py — Transparent TCP bridge for capturing Instantel MiniMate Plus
|
ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus
|
||||||
call-home (ACH) traffic.
|
call-home (ACH) traffic.
|
||||||
|
|
||||||
|
Modes
|
||||||
|
-----
|
||||||
|
standalone Accept connection, capture frames, do NOT forward anywhere.
|
||||||
|
Good for initial discovery with a test unit.
|
||||||
|
|
||||||
|
bridge Forward to one upstream server while capturing.
|
||||||
|
Use this for the initial discovery phase with your test server.
|
||||||
|
|
||||||
|
splitter Forward to the PRIMARY upstream (production ACH server) AND
|
||||||
|
mirror a copy to a SECONDARY server simultaneously.
|
||||||
|
The device never knows — it talks to the primary the whole time.
|
||||||
|
If the mirror fails, the primary connection is unaffected.
|
||||||
|
|
||||||
|
Think of it like a headphone splitter: one input, two outputs.
|
||||||
|
Primary → authoritative responses back to device.
|
||||||
|
Mirror → gets all device bytes, its responses are discarded.
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
-----
|
-----
|
||||||
# Bridge mode: forward to real ACH server while logging
|
# Standalone capture (test/discovery — no forwarding)
|
||||||
python bridges/ach_bridge.py --upstream HOST:PORT [--port 9034]
|
python bridges/ach_bridge.py --standalone [--port 12345]
|
||||||
|
|
||||||
# Standalone capture mode: accept connection, don't forward (use when you
|
# Bridge mode (forward to one server, e.g. your test server)
|
||||||
# want to see what the device sends/expects without a real server)
|
python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345]
|
||||||
python bridges/ach_bridge.py --standalone [--port 9034]
|
|
||||||
|
|
||||||
Setup
|
# Splitter mode (production: forward to prod + mirror to your server)
|
||||||
-----
|
python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345]
|
||||||
1. Find the "Remote Hostname/IP" and port in ACEmanager → Dual SIM / WAN →
|
|
||||||
Call Home (or equivalent menu on your RV50/RV55 firmware).
|
|
||||||
2. Temporarily change that setting on ONE unit to point at:
|
|
||||||
your-machine-local-ip : <--port>
|
|
||||||
3. Run this script.
|
|
||||||
4. Wait for the unit to trigger / call home. A capture file is written to
|
|
||||||
bridges/captures/ach_<timestamp>/ alongside an auto-parsed frame log.
|
|
||||||
5. Revert the unit's ACEmanager setting.
|
|
||||||
|
|
||||||
Output
|
Setup for discovery (test server, don't touch prod)
|
||||||
|
----------------------------------------------------
|
||||||
|
1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345).
|
||||||
|
2. Take ONE test unit. In ACEmanager → Call Home, point it at:
|
||||||
|
<this machine's LAN IP> : <--port>
|
||||||
|
3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345
|
||||||
|
4. Trigger the unit. Raw frames are saved to bridges/captures/ach_<ts>/.
|
||||||
|
5. Revert the unit's ACEmanager setting when done.
|
||||||
|
|
||||||
|
Setup for production splitter (when you're ready)
|
||||||
|
-------------------------------------------------
|
||||||
|
This does NOT touch the units. Instead you re-route traffic at the network
|
||||||
|
layer so that call-home packets arrive at a machine running this script first.
|
||||||
|
Typical approach: update the DNS entry / host record your prod ACH server is
|
||||||
|
registered under to point at this machine. The units keep their existing
|
||||||
|
ACEmanager settings.
|
||||||
|
|
||||||
|
python bridges/ach_bridge.py \\
|
||||||
|
--upstream PROD_ACH_HOST:12345 \\
|
||||||
|
--mirror MY_NEW_SERVER:12345 \\
|
||||||
|
--port 12345
|
||||||
|
|
||||||
|
Output (each connection gets its own timestamped sub-directory)
|
||||||
------
|
------
|
||||||
bridges/captures/ach_<ISO-timestamp>/
|
bridges/captures/ach_<ts>/
|
||||||
raw_client_<ts>.bin — raw bytes from the device (S3 side)
|
raw_client_<ts>.bin — raw bytes from the device (S3 side)
|
||||||
raw_server_<ts>.bin — raw bytes from the upstream server (BW side)
|
raw_server_<ts>.bin — raw bytes from the primary upstream (BW side)
|
||||||
(empty in standalone mode)
|
raw_mirror_<ts>.bin — raw bytes from the mirror upstream (splitter mode only)
|
||||||
session_<ts>.log — human-readable frame parse log
|
session_<ts>.log — human-readable frame parse log
|
||||||
session_<ts>.jsonl — JSON-lines frame log (for downstream tooling)
|
session_<ts>.jsonl — JSON-lines frame log
|
||||||
|
|
||||||
The raw_client / raw_server files are byte-for-byte compatible with the
|
raw_client / raw_server are byte-for-byte compatible with parse_capture.py.
|
||||||
existing capture format used by bridges/parse_capture.py and the rest of
|
|
||||||
the analysis tooling.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@@ -47,7 +75,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
# Add project root to path
|
# Add project root to path
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
@@ -113,26 +141,28 @@ _KNOWN_REQ_SUBS = {
|
|||||||
def _label_s3_frame(frame: S3Frame) -> str:
|
def _label_s3_frame(frame: S3Frame) -> str:
|
||||||
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
|
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
|
||||||
chk = "✓" if frame.checksum_valid else "✗CHK"
|
chk = "✓" if frame.checksum_valid else "✗CHK"
|
||||||
return f"S3→ SUB=0x{frame.sub:02X} ({name}) page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
|
return (
|
||||||
|
f"S3→ SUB=0x{frame.sub:02X} ({name}) "
|
||||||
|
f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _label_bw_frame(data: bytes) -> str:
|
def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str:
|
||||||
"""Best-effort label for a raw BW request frame."""
|
"""Best-effort label for a raw BW request frame (wire bytes)."""
|
||||||
# BW frame (destuffed): ACK STX [10 10] flags sub ...
|
# Wire layout: 41 02 10 10 00 sub ...
|
||||||
# Wire: 41 02 10 10 00 sub ...
|
|
||||||
if len(data) < 6:
|
if len(data) < 6:
|
||||||
return f"BW→ (short {len(data)}B)"
|
return f"{prefix} (short {len(data)}B)"
|
||||||
sub = data[5]
|
sub = data[5]
|
||||||
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
|
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
|
||||||
return f" →BW SUB=0x{sub:02X} ({name}) {len(data)}B"
|
return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B"
|
||||||
|
|
||||||
|
|
||||||
# ── Per-session capture writer ────────────────────────────────────────────────
|
# ── Per-session capture writer ─────────────────────────────────────────────────
|
||||||
|
|
||||||
class CaptureSession:
|
class CaptureSession:
|
||||||
"""Writes raw bytes + parsed log for one TCP connection."""
|
"""Writes raw bytes + parsed log for one TCP connection."""
|
||||||
|
|
||||||
def __init__(self, capture_dir: Path, peer: str):
|
def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False):
|
||||||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
self.dir = capture_dir / f"ach_{ts}"
|
self.dir = capture_dir / f"ach_{ts}"
|
||||||
self.dir.mkdir(parents=True, exist_ok=True)
|
self.dir.mkdir(parents=True, exist_ok=True)
|
||||||
@@ -140,6 +170,9 @@ class CaptureSession:
|
|||||||
|
|
||||||
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
|
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
|
||||||
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
|
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
|
||||||
|
self._raw_mirror = (
|
||||||
|
open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None
|
||||||
|
)
|
||||||
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
|
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
|
||||||
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
|
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
|
||||||
|
|
||||||
@@ -147,20 +180,24 @@ class CaptureSession:
|
|||||||
self._frame_count = 0
|
self._frame_count = 0
|
||||||
self._byte_count_client = 0
|
self._byte_count_client = 0
|
||||||
self._byte_count_server = 0
|
self._byte_count_server = 0
|
||||||
|
self._byte_count_mirror = 0
|
||||||
|
|
||||||
self._log(f"# ACH capture — peer={peer} started={datetime.datetime.now().isoformat()}")
|
self._log(
|
||||||
|
f"# ACH capture — peer={peer} "
|
||||||
|
f"mirror={'yes' if has_mirror else 'no'} "
|
||||||
|
f"started={datetime.datetime.now().isoformat()}"
|
||||||
|
)
|
||||||
self._log(f"# Output dir: {self.dir}")
|
self._log(f"# Output dir: {self.dir}")
|
||||||
log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
|
log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
|
||||||
|
|
||||||
# ── public API ───────────────────────────────────────────────────────────
|
# ── public API ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def feed_client(self, data: bytes) -> None:
|
def feed_client(self, data: bytes) -> None:
|
||||||
"""Bytes arriving FROM the device (S3 side)."""
|
"""Bytes FROM the device (S3 response frames)."""
|
||||||
self._raw_client.write(data)
|
self._raw_client.write(data)
|
||||||
self._raw_client.flush()
|
self._raw_client.flush()
|
||||||
self._byte_count_client += len(data)
|
self._byte_count_client += len(data)
|
||||||
|
|
||||||
# Parse S3 frames
|
|
||||||
for byte in data:
|
for byte in data:
|
||||||
frame = self._s3_parser.feed(bytes([byte]))
|
frame = self._s3_parser.feed(bytes([byte]))
|
||||||
if frame:
|
if frame:
|
||||||
@@ -169,39 +206,57 @@ class CaptureSession:
|
|||||||
self._frame_count += 1
|
self._frame_count += 1
|
||||||
label = _label_s3_frame(f)
|
label = _label_s3_frame(f)
|
||||||
self._log(f"[{self._frame_count:04d}] {label}")
|
self._log(f"[{self._frame_count:04d}] {label}")
|
||||||
self._log(f" hex: {f.data[:64].hex()}"
|
self._log(
|
||||||
+ (" ..." if len(f.data) > 64 else ""))
|
f" hex: {f.data[:64].hex()}"
|
||||||
|
+ (" ..." if len(f.data) > 64 else "")
|
||||||
|
)
|
||||||
self._emit_json("s3", f)
|
self._emit_json("s3", f)
|
||||||
|
|
||||||
def feed_server(self, data: bytes) -> None:
|
def feed_server(self, data: bytes) -> None:
|
||||||
"""Bytes arriving FROM the upstream server (BW side)."""
|
"""Bytes FROM the primary upstream server (BW request frames)."""
|
||||||
self._raw_server.write(data)
|
self._raw_server.write(data)
|
||||||
self._raw_server.flush()
|
self._raw_server.flush()
|
||||||
self._byte_count_server += len(data)
|
self._byte_count_server += len(data)
|
||||||
label = _label_bw_frame(data)
|
label = _label_bw_frame(data, prefix=" →BW[primary]")
|
||||||
self._log(f" {label}")
|
self._log(f" {label}")
|
||||||
|
|
||||||
|
def feed_mirror(self, data: bytes) -> None:
|
||||||
|
"""Bytes FROM the mirror server (logged, not forwarded to device)."""
|
||||||
|
if self._raw_mirror:
|
||||||
|
self._raw_mirror.write(data)
|
||||||
|
self._raw_mirror.flush()
|
||||||
|
self._byte_count_mirror += len(data)
|
||||||
|
label = _label_bw_frame(data, prefix=" →BW[mirror] ")
|
||||||
|
self._log(f" {label} [MIRROR — not sent to device]")
|
||||||
|
|
||||||
def close(self, reason: str = "connection closed") -> None:
|
def close(self, reason: str = "connection closed") -> None:
|
||||||
self._log(f"# Session ended: {reason}")
|
self._log(f"# Session ended: {reason}")
|
||||||
self._log(f"# Totals — client_bytes={self._byte_count_client} "
|
self._log(
|
||||||
f"server_bytes={self._byte_count_server} "
|
f"# Totals — client={self._byte_count_client}B "
|
||||||
f"s3_frames={self._frame_count}")
|
f"server={self._byte_count_server}B "
|
||||||
for fh in (self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh):
|
f"mirror={self._byte_count_mirror}B "
|
||||||
|
f"s3_frames={self._frame_count}"
|
||||||
|
)
|
||||||
|
handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh]
|
||||||
|
if self._raw_mirror:
|
||||||
|
handles.append(self._raw_mirror)
|
||||||
|
for fh in handles:
|
||||||
try:
|
try:
|
||||||
fh.close()
|
fh.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
log.info(
|
log.info(
|
||||||
"Capture session closed (%s): %dB client, %dB server, %d S3 frames → %s",
|
"Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s",
|
||||||
reason, self._byte_count_client, self._byte_count_server,
|
reason,
|
||||||
self._frame_count, self.dir,
|
self._byte_count_client, self._byte_count_server,
|
||||||
|
self._byte_count_mirror, self._frame_count,
|
||||||
|
self.dir,
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── internals ────────────────────────────────────────────────────────────
|
# ── internals ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _log(self, msg: str) -> None:
|
def _log(self, msg: str) -> None:
|
||||||
print(msg, file=self._log_fh, flush=True)
|
print(msg, file=self._log_fh, flush=True)
|
||||||
# Also echo to console for live monitoring
|
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
def _emit_json(self, direction: str, frame: S3Frame) -> None:
|
def _emit_json(self, direction: str, frame: S3Frame) -> None:
|
||||||
@@ -216,18 +271,31 @@ class CaptureSession:
|
|||||||
print(json.dumps(record), file=self._jsonl_fh, flush=True)
|
print(json.dumps(record), file=self._jsonl_fh, flush=True)
|
||||||
|
|
||||||
|
|
||||||
# ── Bridge connection handler ─────────────────────────────────────────────────
|
# ── Bridge / splitter connection handler ──────────────────────────────────────
|
||||||
|
|
||||||
class BridgeHandler:
|
class BridgeHandler:
|
||||||
|
"""
|
||||||
|
Handles inbound device connections.
|
||||||
|
|
||||||
|
Modes (determined by which upstreams are configured):
|
||||||
|
standalone — no upstream_host / no mirror_host
|
||||||
|
bridge — upstream_host set, no mirror_host
|
||||||
|
splitter — upstream_host AND mirror_host both set
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
capture_dir: Path,
|
capture_dir: Path,
|
||||||
upstream_host: Optional[str],
|
upstream_host: Optional[str],
|
||||||
upstream_port: Optional[int],
|
upstream_port: Optional[int],
|
||||||
|
mirror_host: Optional[str] = None,
|
||||||
|
mirror_port: Optional[int] = None,
|
||||||
):
|
):
|
||||||
self.capture_dir = capture_dir
|
self.capture_dir = capture_dir
|
||||||
self.upstream_host = upstream_host
|
self.upstream_host = upstream_host
|
||||||
self.upstream_port = upstream_port
|
self.upstream_port = upstream_port
|
||||||
|
self.mirror_host = mirror_host
|
||||||
|
self.mirror_port = mirror_port
|
||||||
|
|
||||||
async def handle(
|
async def handle(
|
||||||
self,
|
self,
|
||||||
@@ -238,41 +306,11 @@ class BridgeHandler:
|
|||||||
peer_str = f"{peer[0]}:{peer[1]}"
|
peer_str = f"{peer[0]}:{peer[1]}"
|
||||||
log.info("Inbound connection from %s", peer_str)
|
log.info("Inbound connection from %s", peer_str)
|
||||||
|
|
||||||
session = CaptureSession(self.capture_dir, peer_str)
|
has_mirror = bool(self.mirror_host)
|
||||||
|
session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror)
|
||||||
|
|
||||||
if self.upstream_host:
|
if not self.upstream_host:
|
||||||
# Bridge mode: connect to upstream and relay
|
# ── Standalone mode ──────────────────────────────────────────────
|
||||||
try:
|
|
||||||
up_reader, up_writer = await asyncio.open_connection(
|
|
||||||
self.upstream_host, self.upstream_port
|
|
||||||
)
|
|
||||||
log.info("Connected to upstream %s:%s", self.upstream_host, self.upstream_port)
|
|
||||||
except Exception as exc:
|
|
||||||
log.error("Failed to connect to upstream: %s", exc)
|
|
||||||
session.close(f"upstream connect failed: {exc}")
|
|
||||||
client_writer.close()
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await asyncio.gather(
|
|
||||||
self._relay(client_reader, up_writer, session, "client"),
|
|
||||||
self._relay(up_reader, client_writer, session, "server"),
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
except Exception as exc:
|
|
||||||
log.warning("Bridge relay error: %s", exc)
|
|
||||||
finally:
|
|
||||||
session.close("bridge relay ended")
|
|
||||||
for writer in (client_writer, up_writer):
|
|
||||||
try:
|
|
||||||
writer.close()
|
|
||||||
await writer.wait_closed()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Standalone mode: just capture, don't forward
|
|
||||||
log.info("Standalone mode — recording inbound traffic only")
|
log.info("Standalone mode — recording inbound traffic only")
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
@@ -291,30 +329,168 @@ class BridgeHandler:
|
|||||||
await client_writer.wait_closed()
|
await client_writer.wait_closed()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
async def _relay(
|
# ── Bridge / splitter mode ───────────────────────────────────────────
|
||||||
|
# Connect to primary upstream (required)
|
||||||
|
try:
|
||||||
|
up_reader, up_writer = await asyncio.open_connection(
|
||||||
|
self.upstream_host, self.upstream_port
|
||||||
|
)
|
||||||
|
log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port)
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("Failed to connect to primary upstream: %s", exc)
|
||||||
|
session.close(f"primary connect failed: {exc}")
|
||||||
|
client_writer.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Connect to mirror upstream (optional — failure is non-fatal)
|
||||||
|
mir_reader: Optional[asyncio.StreamReader] = None
|
||||||
|
mir_writer: Optional[asyncio.StreamWriter] = None
|
||||||
|
if self.mirror_host:
|
||||||
|
try:
|
||||||
|
mir_reader, mir_writer = await asyncio.open_connection(
|
||||||
|
self.mirror_host, self.mirror_port
|
||||||
|
)
|
||||||
|
log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port)
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning(
|
||||||
|
"Mirror connect failed — continuing without mirror: %s", exc
|
||||||
|
)
|
||||||
|
session._log(f"# WARNING: mirror connect failed: {exc}")
|
||||||
|
|
||||||
|
# Build relay tasks
|
||||||
|
#
|
||||||
|
# ┌──────────┐ device bytes ┌─────────────┐
|
||||||
|
# │ Device │ ─────────────► │ PRIMARY │ responses ──► device
|
||||||
|
# └──────────┘ └─────────────┘
|
||||||
|
# │
|
||||||
|
# │ device bytes (copy)
|
||||||
|
# ▼
|
||||||
|
# ┌─────────────┐
|
||||||
|
# │ MIRROR │ responses discarded (logged only)
|
||||||
|
# └─────────────┘
|
||||||
|
#
|
||||||
|
tasks = [
|
||||||
|
asyncio.create_task(
|
||||||
|
self._relay_device(client_reader, up_writer, mir_writer, session),
|
||||||
|
name="device→upstreams",
|
||||||
|
),
|
||||||
|
asyncio.create_task(
|
||||||
|
self._relay_simple(up_reader, client_writer, session, "server"),
|
||||||
|
name="primary→device",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
if mir_reader is not None:
|
||||||
|
tasks.append(asyncio.create_task(
|
||||||
|
self._relay_drain(mir_reader, session),
|
||||||
|
name="mirror→drain",
|
||||||
|
))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait for the device-to-upstreams relay to exit first (device
|
||||||
|
# disconnected or primary dropped). Then cancel the rest.
|
||||||
|
done, pending = await asyncio.wait(
|
||||||
|
tasks,
|
||||||
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
|
)
|
||||||
|
for t in pending:
|
||||||
|
t.cancel()
|
||||||
|
try:
|
||||||
|
await t
|
||||||
|
except (asyncio.CancelledError, Exception):
|
||||||
|
pass
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("Bridge relay error: %s", exc)
|
||||||
|
finally:
|
||||||
|
session.close("relay ended")
|
||||||
|
for writer in filter(None, [client_writer, up_writer, mir_writer]):
|
||||||
|
try:
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ── Relay helpers ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
async def _relay_device(
|
||||||
|
self,
|
||||||
|
reader: asyncio.StreamReader,
|
||||||
|
primary_writer: asyncio.StreamWriter,
|
||||||
|
mirror_writer: Optional[asyncio.StreamWriter],
|
||||||
|
session: CaptureSession,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Read bytes from the device, write to the primary server, and also
|
||||||
|
write a copy to the mirror server (if connected). Mirror write
|
||||||
|
failures are non-fatal — we log and continue.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await reader.read(4096)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
session.feed_client(data)
|
||||||
|
|
||||||
|
# Primary write — failure IS fatal (lose primary = lose prod)
|
||||||
|
primary_writer.write(data)
|
||||||
|
await primary_writer.drain()
|
||||||
|
|
||||||
|
# Mirror write — failure is non-fatal
|
||||||
|
if mirror_writer is not None:
|
||||||
|
try:
|
||||||
|
mirror_writer.write(data)
|
||||||
|
await mirror_writer.drain()
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("Mirror write failed (non-fatal): %s", exc)
|
||||||
|
session._log(f"# WARNING: mirror write failed: {exc}")
|
||||||
|
mirror_writer = None # stop trying
|
||||||
|
|
||||||
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _relay_simple(
|
||||||
self,
|
self,
|
||||||
reader: asyncio.StreamReader,
|
reader: asyncio.StreamReader,
|
||||||
writer: asyncio.StreamWriter,
|
writer: asyncio.StreamWriter,
|
||||||
session: CaptureSession,
|
session: CaptureSession,
|
||||||
direction: str,
|
direction: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Standard single-pipe relay (primary→device or vice-versa)."""
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
data = await reader.read(4096)
|
data = await reader.read(4096)
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
if direction == "client":
|
if direction == "server":
|
||||||
session.feed_client(data)
|
|
||||||
else:
|
|
||||||
session.feed_server(data)
|
session.feed_server(data)
|
||||||
|
else:
|
||||||
|
session.feed_client(data)
|
||||||
writer.write(data)
|
writer.write(data)
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def _relay_drain(
|
||||||
|
self,
|
||||||
|
reader: asyncio.StreamReader,
|
||||||
|
session: CaptureSession,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Read mirror server responses, log them to session, do NOT forward to
|
||||||
|
device. The device only ever sees primary server responses.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await reader.read(4096)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
session.feed_mirror(data)
|
||||||
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
||||||
|
pass
|
||||||
|
|
||||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
||||||
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
async def main(args: argparse.Namespace) -> None:
|
async def main(args: argparse.Namespace) -> None:
|
||||||
capture_dir = Path(__file__).parent / "captures"
|
capture_dir = Path(__file__).parent / "captures"
|
||||||
@@ -322,6 +498,8 @@ async def main(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
upstream_host: Optional[str] = None
|
upstream_host: Optional[str] = None
|
||||||
upstream_port: Optional[int] = None
|
upstream_port: Optional[int] = None
|
||||||
|
mirror_host: Optional[str] = None
|
||||||
|
mirror_port: Optional[int] = None
|
||||||
|
|
||||||
if not args.standalone:
|
if not args.standalone:
|
||||||
if not args.upstream:
|
if not args.upstream:
|
||||||
@@ -329,12 +507,24 @@ async def main(args: argparse.Namespace) -> None:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
parts = args.upstream.rsplit(":", 1)
|
parts = args.upstream.rsplit(":", 1)
|
||||||
if len(parts) != 2:
|
if len(parts) != 2:
|
||||||
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:9034)")
|
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
upstream_host = parts[0]
|
upstream_host = parts[0]
|
||||||
upstream_port = int(parts[1])
|
upstream_port = int(parts[1])
|
||||||
|
|
||||||
handler = BridgeHandler(capture_dir, upstream_host, upstream_port)
|
if args.mirror:
|
||||||
|
parts = args.mirror.rsplit(":", 1)
|
||||||
|
if len(parts) != 2:
|
||||||
|
print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)")
|
||||||
|
sys.exit(1)
|
||||||
|
mirror_host = parts[0]
|
||||||
|
mirror_port = int(parts[1])
|
||||||
|
|
||||||
|
handler = BridgeHandler(
|
||||||
|
capture_dir,
|
||||||
|
upstream_host, upstream_port,
|
||||||
|
mirror_host, mirror_port,
|
||||||
|
)
|
||||||
|
|
||||||
server = await asyncio.start_server(
|
server = await asyncio.start_server(
|
||||||
handler.handle,
|
handler.handle,
|
||||||
@@ -342,19 +532,38 @@ async def main(args: argparse.Namespace) -> None:
|
|||||||
port=args.port,
|
port=args.port,
|
||||||
)
|
)
|
||||||
|
|
||||||
mode = f"bridge → {upstream_host}:{upstream_port}" if upstream_host else "standalone capture"
|
# ── Startup banner ────────────────────────────────────────────────────────
|
||||||
|
if args.standalone:
|
||||||
|
mode = "STANDALONE capture (no forwarding)"
|
||||||
|
elif mirror_host:
|
||||||
|
mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}"
|
||||||
|
else:
|
||||||
|
mode = f"BRIDGE → {upstream_host}:{upstream_port}"
|
||||||
|
|
||||||
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
|
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
|
||||||
print(f"\n{'='*60}")
|
print(f"\n{'='*70}")
|
||||||
print(f" ACH bridge listening on {addrs}")
|
print(f" ACH bridge/splitter listening on {addrs}")
|
||||||
print(f" Mode: {mode}")
|
print(f" Mode: {mode}")
|
||||||
print(f" Captures: {capture_dir}/ach_<timestamp>/")
|
print(f" Captures: {capture_dir}/ach_<timestamp>/")
|
||||||
print(f"{'='*60}")
|
print(f"{'='*70}")
|
||||||
print(f"\n Point your unit's ACEmanager call-home destination to:")
|
|
||||||
print(f" <this machine's LAN IP>:{args.port}")
|
if upstream_host and not mirror_host:
|
||||||
if upstream_host:
|
print(f"\n DISCOVERY PHASE")
|
||||||
print(f"\n All traffic will be forwarded to {upstream_host}:{upstream_port}")
|
print(f" Point your TEST unit's ACEmanager call-home destination to:")
|
||||||
print(f" Your live data feed is uninterrupted.")
|
print(f" <this machine's LAN IP> : {args.port}")
|
||||||
print(f"\n Waiting for inbound connection... (Ctrl-C to stop)\n")
|
print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}")
|
||||||
|
elif mirror_host:
|
||||||
|
print(f"\n SPLITTER MODE — PRODUCTION SAFE")
|
||||||
|
print(f" Units connect as normal. Every byte is forwarded to:")
|
||||||
|
print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}")
|
||||||
|
print(f" MIRROR (your server): {mirror_host}:{mirror_port}")
|
||||||
|
print(f" Only PRIMARY responses reach the device.")
|
||||||
|
print(f" Mirror failures are logged and do not affect the device.")
|
||||||
|
else:
|
||||||
|
print(f"\n STANDALONE MODE — capture only, nothing forwarded")
|
||||||
|
print(f" Point a unit at <this machine's LAN IP> : {args.port}")
|
||||||
|
|
||||||
|
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
||||||
|
|
||||||
async with server:
|
async with server:
|
||||||
await server.serve_forever()
|
await server.serve_forever()
|
||||||
@@ -362,24 +571,41 @@ async def main(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
def parse_args() -> argparse.Namespace:
|
def parse_args() -> argparse.Namespace:
|
||||||
p = argparse.ArgumentParser(
|
p = argparse.ArgumentParser(
|
||||||
description="Transparent TCP bridge for capturing MiniMate Plus call-home traffic."
|
description=(
|
||||||
|
"Transparent TCP bridge / splitter for Instantel MiniMate Plus "
|
||||||
|
"call-home (ACH) traffic."
|
||||||
|
),
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=__doc__,
|
||||||
)
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--upstream", "-u",
|
"--upstream", "-u",
|
||||||
metavar="HOST:PORT",
|
metavar="HOST:PORT",
|
||||||
help="Upstream ACH server to forward to (e.g. 203.0.113.5:9034). "
|
help=(
|
||||||
"Omit with --standalone for capture-only mode.",
|
"Primary upstream ACH server to forward to "
|
||||||
|
"(e.g. 203.0.113.5:12345). "
|
||||||
|
"Omit with --standalone for capture-only mode."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--mirror", "-m",
|
||||||
|
metavar="HOST:PORT",
|
||||||
|
help=(
|
||||||
|
"Mirror / secondary server to receive a copy of all device bytes "
|
||||||
|
"(splitter mode). Mirror responses are logged but NOT forwarded "
|
||||||
|
"to the device. Mirror failures are non-fatal."
|
||||||
|
),
|
||||||
)
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--port", "-p",
|
"--port", "-p",
|
||||||
type=int,
|
type=int,
|
||||||
default=9034,
|
default=12345,
|
||||||
help="Local port to listen on (default: 9034).",
|
help="Local port to listen on (default: 12345).",
|
||||||
)
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--standalone", "-s",
|
"--standalone", "-s",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="Capture-only mode: accept connection but do not forward to upstream.",
|
help="Capture-only mode: accept connection, do not forward anywhere.",
|
||||||
)
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--verbose", "-v",
|
"--verbose", "-v",
|
||||||
|
|||||||
Reference in New Issue
Block a user