74233d7e31
Adds a production-safe headphone-splitter mode: - Device bytes tee'd to both --upstream (primary/prod) and --mirror (new server) - Only primary server responses are returned to the device - Mirror connect/write failures are non-fatal and logged; prod is unaffected - New raw_mirror_<ts>.bin capture file alongside raw_client/raw_server Three modes: standalone (capture only), bridge (one upstream), splitter (two). Default listen port changed to 12345 to match project ACH setup. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
628 lines
24 KiB
Python
628 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ach_bridge.py — Transparent TCP bridge / splitter for Instantel MiniMate Plus
|
|
call-home (ACH) traffic.
|
|
|
|
Modes
|
|
-----
|
|
standalone Accept connection, capture frames, do NOT forward anywhere.
|
|
Good for initial discovery with a test unit.
|
|
|
|
bridge Forward to one upstream server while capturing.
|
|
Use this for the initial discovery phase with your test server.
|
|
|
|
splitter Forward to the PRIMARY upstream (production ACH server) AND
|
|
mirror a copy to a SECONDARY server simultaneously.
|
|
The device never knows — it talks to the primary the whole time.
|
|
If the mirror fails, the primary connection is unaffected.
|
|
|
|
Think of it like a headphone splitter: one input, two outputs.
|
|
Primary → authoritative responses back to device.
|
|
Mirror → gets all device bytes, its responses are discarded.
|
|
|
|
Usage
|
|
-----
|
|
# Standalone capture (test/discovery — no forwarding)
|
|
python bridges/ach_bridge.py --standalone [--port 12345]
|
|
|
|
# Bridge mode (forward to one server, e.g. your test server)
|
|
python bridges/ach_bridge.py --upstream HOST:PORT [--port 12345]
|
|
|
|
# Splitter mode (production: forward to prod + mirror to your server)
|
|
python bridges/ach_bridge.py --upstream PROD_HOST:PORT --mirror MY_HOST:PORT [--port 12345]
|
|
|
|
Setup for discovery (test server, don't touch prod)
|
|
----------------------------------------------------
|
|
1. Stand up your test ACH server, note its IP and port (e.g. 192.168.1.50:12345).
|
|
2. Take ONE test unit. In ACEmanager → Call Home, point it at:
|
|
<this machine's LAN IP> : <--port>
|
|
3. Run: python bridges/ach_bridge.py --upstream TEST_SERVER:12345 --port 12345
|
|
4. Trigger the unit. Raw frames are saved to bridges/captures/ach_<ts>/.
|
|
5. Revert the unit's ACEmanager setting when done.
|
|
|
|
Setup for production splitter (when you're ready)
|
|
-------------------------------------------------
|
|
This does NOT touch the units. Instead you re-route traffic at the network
|
|
layer so that call-home packets arrive at a machine running this script first.
|
|
Typical approach: update the DNS entry / host record your prod ACH server is
|
|
registered under to point at this machine. The units keep their existing
|
|
ACEmanager settings.
|
|
|
|
python bridges/ach_bridge.py \\
|
|
--upstream PROD_ACH_HOST:12345 \\
|
|
--mirror MY_NEW_SERVER:12345 \\
|
|
--port 12345
|
|
|
|
Output (each connection gets its own timestamped sub-directory)
|
|
------
|
|
bridges/captures/ach_<ts>/
|
|
raw_client_<ts>.bin — raw bytes from the device (S3 side)
|
|
raw_server_<ts>.bin — raw bytes from the primary upstream (BW side)
|
|
raw_mirror_<ts>.bin — raw bytes from the mirror upstream (splitter mode only)
|
|
session_<ts>.log — human-readable frame parse log
|
|
session_<ts>.jsonl — JSON-lines frame log
|
|
|
|
raw_client / raw_server are byte-for-byte compatible with parse_capture.py.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from minimateplus.framing import S3FrameParser, S3Frame
|
|
|
|
log = logging.getLogger("ach_bridge")
|
|
|
|
|
|
# ── Frame label helpers ──────────────────────────────────────────────────────
|
|
|
|
_KNOWN_RSP_SUBS = {
|
|
0xA4: "POLL_RSP",
|
|
0xA5: "BULK_WAVEFORM_RSP",
|
|
0xE0: "ADVANCE_EVENT_RSP",
|
|
0xE1: "EVENT_INDEX_FIRST_RSP",
|
|
0xE3: "MONITOR_STATUS_RSP",
|
|
0xEA: "SERIAL_NUM_RSP",
|
|
0xF3: "WAVEFORM_RECORD_RSP",
|
|
0xF5: "WAVEFORM_HEADER_RSP",
|
|
0xF7: "EVENT_INDEX_RSP",
|
|
0xF9: "UNK_06_RSP",
|
|
0xFE: "DEVICE_INFO_RSP",
|
|
# Write acks
|
|
0x97: "EVT_IDX_WRITE_ACK",
|
|
0x8C: "CONFIRM_B_ACK",
|
|
0x8E: "COMPLIANCE_WRITE_ACK",
|
|
0x8D: "CONFIRM_A_ACK",
|
|
0x7D: "TRIGGER_WRITE_ACK",
|
|
0x7C: "TRIGGER_CONFIRM_ACK",
|
|
0x96: "WAVEFORM_WRITE_ACK",
|
|
0x8B: "CONFIRM_C_ACK",
|
|
0x69: "START_MONITOR_ACK",
|
|
0x68: "STOP_MONITOR_ACK",
|
|
}
|
|
|
|
_KNOWN_REQ_SUBS = {
|
|
0x5B: "POLL",
|
|
0x5A: "BULK_WAVEFORM",
|
|
0x1F: "ADVANCE_EVENT",
|
|
0x1E: "EVENT_INDEX_FIRST",
|
|
0x1C: "MONITOR_STATUS",
|
|
0x15: "SERIAL_NUM",
|
|
0x0C: "WAVEFORM_RECORD",
|
|
0x0A: "WAVEFORM_HEADER",
|
|
0x08: "EVENT_INDEX",
|
|
0x06: "UNK_06",
|
|
0x01: "DEVICE_INFO",
|
|
# Write commands
|
|
0x68: "EVT_IDX_WRITE",
|
|
0x73: "CONFIRM_B",
|
|
0x71: "COMPLIANCE_WRITE",
|
|
0x72: "CONFIRM_A",
|
|
0x82: "TRIGGER_WRITE",
|
|
0x83: "TRIGGER_CONFIRM",
|
|
0x69: "WAVEFORM_WRITE",
|
|
0x74: "CONFIRM_C",
|
|
0x96: "START_MONITOR",
|
|
0x97: "STOP_MONITOR",
|
|
}
|
|
|
|
|
|
def _label_s3_frame(frame: S3Frame) -> str:
|
|
name = _KNOWN_RSP_SUBS.get(frame.sub, f"UNK_0x{frame.sub:02X}")
|
|
chk = "✓" if frame.checksum_valid else "✗CHK"
|
|
return (
|
|
f"S3→ SUB=0x{frame.sub:02X} ({name}) "
|
|
f"page=0x{frame.page_key:04X} data={len(frame.data)}B {chk}"
|
|
)
|
|
|
|
|
|
def _label_bw_frame(data: bytes, prefix: str = " →BW") -> str:
|
|
"""Best-effort label for a raw BW request frame (wire bytes)."""
|
|
# Wire layout: 41 02 10 10 00 sub ...
|
|
if len(data) < 6:
|
|
return f"{prefix} (short {len(data)}B)"
|
|
sub = data[5]
|
|
name = _KNOWN_REQ_SUBS.get(sub, f"UNK_0x{sub:02X}")
|
|
return f"{prefix} SUB=0x{sub:02X} ({name}) {len(data)}B"
|
|
|
|
|
|
# ── Per-session capture writer ─────────────────────────────────────────────────
|
|
|
|
class CaptureSession:
|
|
"""Writes raw bytes + parsed log for one TCP connection."""
|
|
|
|
def __init__(self, capture_dir: Path, peer: str, *, has_mirror: bool = False):
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.dir = capture_dir / f"ach_{ts}"
|
|
self.dir.mkdir(parents=True, exist_ok=True)
|
|
self.peer = peer
|
|
|
|
self._raw_client = open(self.dir / f"raw_client_{ts}.bin", "wb")
|
|
self._raw_server = open(self.dir / f"raw_server_{ts}.bin", "wb")
|
|
self._raw_mirror = (
|
|
open(self.dir / f"raw_mirror_{ts}.bin", "wb") if has_mirror else None
|
|
)
|
|
self._log_fh = open(self.dir / f"session_{ts}.log", "w")
|
|
self._jsonl_fh = open(self.dir / f"session_{ts}.jsonl", "w")
|
|
|
|
self._s3_parser = S3FrameParser()
|
|
self._frame_count = 0
|
|
self._byte_count_client = 0
|
|
self._byte_count_server = 0
|
|
self._byte_count_mirror = 0
|
|
|
|
self._log(
|
|
f"# ACH capture — peer={peer} "
|
|
f"mirror={'yes' if has_mirror else 'no'} "
|
|
f"started={datetime.datetime.now().isoformat()}"
|
|
)
|
|
self._log(f"# Output dir: {self.dir}")
|
|
log.info("Capture session opened: %s (peer=%s)", self.dir, peer)
|
|
|
|
# ── public API ────────────────────────────────────────────────────────────
|
|
|
|
def feed_client(self, data: bytes) -> None:
|
|
"""Bytes FROM the device (S3 response frames)."""
|
|
self._raw_client.write(data)
|
|
self._raw_client.flush()
|
|
self._byte_count_client += len(data)
|
|
|
|
for byte in data:
|
|
frame = self._s3_parser.feed(bytes([byte]))
|
|
if frame:
|
|
frames = frame if isinstance(frame, list) else [frame]
|
|
for f in frames:
|
|
self._frame_count += 1
|
|
label = _label_s3_frame(f)
|
|
self._log(f"[{self._frame_count:04d}] {label}")
|
|
self._log(
|
|
f" hex: {f.data[:64].hex()}"
|
|
+ (" ..." if len(f.data) > 64 else "")
|
|
)
|
|
self._emit_json("s3", f)
|
|
|
|
def feed_server(self, data: bytes) -> None:
|
|
"""Bytes FROM the primary upstream server (BW request frames)."""
|
|
self._raw_server.write(data)
|
|
self._raw_server.flush()
|
|
self._byte_count_server += len(data)
|
|
label = _label_bw_frame(data, prefix=" →BW[primary]")
|
|
self._log(f" {label}")
|
|
|
|
def feed_mirror(self, data: bytes) -> None:
|
|
"""Bytes FROM the mirror server (logged, not forwarded to device)."""
|
|
if self._raw_mirror:
|
|
self._raw_mirror.write(data)
|
|
self._raw_mirror.flush()
|
|
self._byte_count_mirror += len(data)
|
|
label = _label_bw_frame(data, prefix=" →BW[mirror] ")
|
|
self._log(f" {label} [MIRROR — not sent to device]")
|
|
|
|
def close(self, reason: str = "connection closed") -> None:
|
|
self._log(f"# Session ended: {reason}")
|
|
self._log(
|
|
f"# Totals — client={self._byte_count_client}B "
|
|
f"server={self._byte_count_server}B "
|
|
f"mirror={self._byte_count_mirror}B "
|
|
f"s3_frames={self._frame_count}"
|
|
)
|
|
handles = [self._raw_client, self._raw_server, self._log_fh, self._jsonl_fh]
|
|
if self._raw_mirror:
|
|
handles.append(self._raw_mirror)
|
|
for fh in handles:
|
|
try:
|
|
fh.close()
|
|
except Exception:
|
|
pass
|
|
log.info(
|
|
"Session closed (%s): %dB client, %dB server, %dB mirror, %d S3 frames → %s",
|
|
reason,
|
|
self._byte_count_client, self._byte_count_server,
|
|
self._byte_count_mirror, self._frame_count,
|
|
self.dir,
|
|
)
|
|
|
|
# ── internals ─────────────────────────────────────────────────────────────
|
|
|
|
def _log(self, msg: str) -> None:
|
|
print(msg, file=self._log_fh, flush=True)
|
|
print(msg)
|
|
|
|
def _emit_json(self, direction: str, frame: S3Frame) -> None:
|
|
record = {
|
|
"dir": direction,
|
|
"sub": frame.sub,
|
|
"page_key": frame.page_key,
|
|
"data_len": len(frame.data),
|
|
"data_hex": frame.data.hex(),
|
|
"checksum_valid": frame.checksum_valid,
|
|
}
|
|
print(json.dumps(record), file=self._jsonl_fh, flush=True)
|
|
|
|
|
|
# ── Bridge / splitter connection handler ──────────────────────────────────────
|
|
|
|
class BridgeHandler:
|
|
"""
|
|
Handles inbound device connections.
|
|
|
|
Modes (determined by which upstreams are configured):
|
|
standalone — no upstream_host / no mirror_host
|
|
bridge — upstream_host set, no mirror_host
|
|
splitter — upstream_host AND mirror_host both set
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
capture_dir: Path,
|
|
upstream_host: Optional[str],
|
|
upstream_port: Optional[int],
|
|
mirror_host: Optional[str] = None,
|
|
mirror_port: Optional[int] = None,
|
|
):
|
|
self.capture_dir = capture_dir
|
|
self.upstream_host = upstream_host
|
|
self.upstream_port = upstream_port
|
|
self.mirror_host = mirror_host
|
|
self.mirror_port = mirror_port
|
|
|
|
async def handle(
|
|
self,
|
|
client_reader: asyncio.StreamReader,
|
|
client_writer: asyncio.StreamWriter,
|
|
) -> None:
|
|
peer = client_writer.get_extra_info("peername", ("?", 0))
|
|
peer_str = f"{peer[0]}:{peer[1]}"
|
|
log.info("Inbound connection from %s", peer_str)
|
|
|
|
has_mirror = bool(self.mirror_host)
|
|
session = CaptureSession(self.capture_dir, peer_str, has_mirror=has_mirror)
|
|
|
|
if not self.upstream_host:
|
|
# ── Standalone mode ──────────────────────────────────────────────
|
|
log.info("Standalone mode — recording inbound traffic only")
|
|
try:
|
|
while True:
|
|
data = await client_reader.read(4096)
|
|
if not data:
|
|
break
|
|
session.feed_client(data)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as exc:
|
|
log.warning("Standalone read error: %s", exc)
|
|
finally:
|
|
session.close("standalone capture ended")
|
|
try:
|
|
client_writer.close()
|
|
await client_writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# ── Bridge / splitter mode ───────────────────────────────────────────
|
|
# Connect to primary upstream (required)
|
|
try:
|
|
up_reader, up_writer = await asyncio.open_connection(
|
|
self.upstream_host, self.upstream_port
|
|
)
|
|
log.info("Connected to primary %s:%s", self.upstream_host, self.upstream_port)
|
|
except Exception as exc:
|
|
log.error("Failed to connect to primary upstream: %s", exc)
|
|
session.close(f"primary connect failed: {exc}")
|
|
client_writer.close()
|
|
return
|
|
|
|
# Connect to mirror upstream (optional — failure is non-fatal)
|
|
mir_reader: Optional[asyncio.StreamReader] = None
|
|
mir_writer: Optional[asyncio.StreamWriter] = None
|
|
if self.mirror_host:
|
|
try:
|
|
mir_reader, mir_writer = await asyncio.open_connection(
|
|
self.mirror_host, self.mirror_port
|
|
)
|
|
log.info("Connected to mirror %s:%s", self.mirror_host, self.mirror_port)
|
|
except Exception as exc:
|
|
log.warning(
|
|
"Mirror connect failed — continuing without mirror: %s", exc
|
|
)
|
|
session._log(f"# WARNING: mirror connect failed: {exc}")
|
|
|
|
# Build relay tasks
|
|
#
|
|
# ┌──────────┐ device bytes ┌─────────────┐
|
|
# │ Device │ ─────────────► │ PRIMARY │ responses ──► device
|
|
# └──────────┘ └─────────────┘
|
|
# │
|
|
# │ device bytes (copy)
|
|
# ▼
|
|
# ┌─────────────┐
|
|
# │ MIRROR │ responses discarded (logged only)
|
|
# └─────────────┘
|
|
#
|
|
tasks = [
|
|
asyncio.create_task(
|
|
self._relay_device(client_reader, up_writer, mir_writer, session),
|
|
name="device→upstreams",
|
|
),
|
|
asyncio.create_task(
|
|
self._relay_simple(up_reader, client_writer, session, "server"),
|
|
name="primary→device",
|
|
),
|
|
]
|
|
if mir_reader is not None:
|
|
tasks.append(asyncio.create_task(
|
|
self._relay_drain(mir_reader, session),
|
|
name="mirror→drain",
|
|
))
|
|
|
|
try:
|
|
# Wait for the device-to-upstreams relay to exit first (device
|
|
# disconnected or primary dropped). Then cancel the rest.
|
|
done, pending = await asyncio.wait(
|
|
tasks,
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
for t in pending:
|
|
t.cancel()
|
|
try:
|
|
await t
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
except Exception as exc:
|
|
log.warning("Bridge relay error: %s", exc)
|
|
finally:
|
|
session.close("relay ended")
|
|
for writer in filter(None, [client_writer, up_writer, mir_writer]):
|
|
try:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Relay helpers ─────────────────────────────────────────────────────────
|
|
|
|
async def _relay_device(
|
|
self,
|
|
reader: asyncio.StreamReader,
|
|
primary_writer: asyncio.StreamWriter,
|
|
mirror_writer: Optional[asyncio.StreamWriter],
|
|
session: CaptureSession,
|
|
) -> None:
|
|
"""
|
|
Read bytes from the device, write to the primary server, and also
|
|
write a copy to the mirror server (if connected). Mirror write
|
|
failures are non-fatal — we log and continue.
|
|
"""
|
|
try:
|
|
while True:
|
|
data = await reader.read(4096)
|
|
if not data:
|
|
break
|
|
session.feed_client(data)
|
|
|
|
# Primary write — failure IS fatal (lose primary = lose prod)
|
|
primary_writer.write(data)
|
|
await primary_writer.drain()
|
|
|
|
# Mirror write — failure is non-fatal
|
|
if mirror_writer is not None:
|
|
try:
|
|
mirror_writer.write(data)
|
|
await mirror_writer.drain()
|
|
except Exception as exc:
|
|
log.warning("Mirror write failed (non-fatal): %s", exc)
|
|
session._log(f"# WARNING: mirror write failed: {exc}")
|
|
mirror_writer = None # stop trying
|
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
|
pass
|
|
|
|
async def _relay_simple(
|
|
self,
|
|
reader: asyncio.StreamReader,
|
|
writer: asyncio.StreamWriter,
|
|
session: CaptureSession,
|
|
direction: str,
|
|
) -> None:
|
|
"""Standard single-pipe relay (primary→device or vice-versa)."""
|
|
try:
|
|
while True:
|
|
data = await reader.read(4096)
|
|
if not data:
|
|
break
|
|
if direction == "server":
|
|
session.feed_server(data)
|
|
else:
|
|
session.feed_client(data)
|
|
writer.write(data)
|
|
await writer.drain()
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
|
pass
|
|
|
|
async def _relay_drain(
|
|
self,
|
|
reader: asyncio.StreamReader,
|
|
session: CaptureSession,
|
|
) -> None:
|
|
"""
|
|
Read mirror server responses, log them to session, do NOT forward to
|
|
device. The device only ever sees primary server responses.
|
|
"""
|
|
try:
|
|
while True:
|
|
data = await reader.read(4096)
|
|
if not data:
|
|
break
|
|
session.feed_mirror(data)
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
|
|
pass
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
|
|
|
async def main(args: argparse.Namespace) -> None:
|
|
capture_dir = Path(__file__).parent / "captures"
|
|
capture_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
upstream_host: Optional[str] = None
|
|
upstream_port: Optional[int] = None
|
|
mirror_host: Optional[str] = None
|
|
mirror_port: Optional[int] = None
|
|
|
|
if not args.standalone:
|
|
if not args.upstream:
|
|
print("ERROR: --upstream HOST:PORT is required unless --standalone is set.")
|
|
sys.exit(1)
|
|
parts = args.upstream.rsplit(":", 1)
|
|
if len(parts) != 2:
|
|
print("ERROR: --upstream must be HOST:PORT (e.g. 203.0.113.5:12345)")
|
|
sys.exit(1)
|
|
upstream_host = parts[0]
|
|
upstream_port = int(parts[1])
|
|
|
|
if args.mirror:
|
|
parts = args.mirror.rsplit(":", 1)
|
|
if len(parts) != 2:
|
|
print("ERROR: --mirror must be HOST:PORT (e.g. 192.168.1.50:12345)")
|
|
sys.exit(1)
|
|
mirror_host = parts[0]
|
|
mirror_port = int(parts[1])
|
|
|
|
handler = BridgeHandler(
|
|
capture_dir,
|
|
upstream_host, upstream_port,
|
|
mirror_host, mirror_port,
|
|
)
|
|
|
|
server = await asyncio.start_server(
|
|
handler.handle,
|
|
host="0.0.0.0",
|
|
port=args.port,
|
|
)
|
|
|
|
# ── Startup banner ────────────────────────────────────────────────────────
|
|
if args.standalone:
|
|
mode = "STANDALONE capture (no forwarding)"
|
|
elif mirror_host:
|
|
mode = f"SPLITTER primary={upstream_host}:{upstream_port} mirror={mirror_host}:{mirror_port}"
|
|
else:
|
|
mode = f"BRIDGE → {upstream_host}:{upstream_port}"
|
|
|
|
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
|
|
print(f"\n{'='*70}")
|
|
print(f" ACH bridge/splitter listening on {addrs}")
|
|
print(f" Mode: {mode}")
|
|
print(f" Captures: {capture_dir}/ach_<timestamp>/")
|
|
print(f"{'='*70}")
|
|
|
|
if upstream_host and not mirror_host:
|
|
print(f"\n DISCOVERY PHASE")
|
|
print(f" Point your TEST unit's ACEmanager call-home destination to:")
|
|
print(f" <this machine's LAN IP> : {args.port}")
|
|
print(f" All traffic will be forwarded to {upstream_host}:{upstream_port}")
|
|
elif mirror_host:
|
|
print(f"\n SPLITTER MODE — PRODUCTION SAFE")
|
|
print(f" Units connect as normal. Every byte is forwarded to:")
|
|
print(f" PRIMARY (authoritative): {upstream_host}:{upstream_port}")
|
|
print(f" MIRROR (your server): {mirror_host}:{mirror_port}")
|
|
print(f" Only PRIMARY responses reach the device.")
|
|
print(f" Mirror failures are logged and do not affect the device.")
|
|
else:
|
|
print(f"\n STANDALONE MODE — capture only, nothing forwarded")
|
|
print(f" Point a unit at <this machine's LAN IP> : {args.port}")
|
|
|
|
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
|
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description=(
|
|
"Transparent TCP bridge / splitter for Instantel MiniMate Plus "
|
|
"call-home (ACH) traffic."
|
|
),
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
p.add_argument(
|
|
"--upstream", "-u",
|
|
metavar="HOST:PORT",
|
|
help=(
|
|
"Primary upstream ACH server to forward to "
|
|
"(e.g. 203.0.113.5:12345). "
|
|
"Omit with --standalone for capture-only mode."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--mirror", "-m",
|
|
metavar="HOST:PORT",
|
|
help=(
|
|
"Mirror / secondary server to receive a copy of all device bytes "
|
|
"(splitter mode). Mirror responses are logged but NOT forwarded "
|
|
"to the device. Mirror failures are non-fatal."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--port", "-p",
|
|
type=int,
|
|
default=12345,
|
|
help="Local port to listen on (default: 12345).",
|
|
)
|
|
p.add_argument(
|
|
"--standalone", "-s",
|
|
action="store_true",
|
|
help="Capture-only mode: accept connection, do not forward anywhere.",
|
|
)
|
|
p.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable debug logging.",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
|
)
|
|
try:
|
|
asyncio.run(main(args))
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|