Files
seismo-relay/bridges/ach_mitm.py
T
claude 3d9db8b662 feat: add ach_mitm.py — transparent TCP MITM proxy for ACH session capture
Listens for inbound unit connections, connects upstream to a real Blastware
ACH server, and forwards bytes bidirectionally while saving both directions to
raw_bw_<ts>.bin and raw_s3_<ts>.bin in the existing capture format.

Used to capture the 4-11-26 Blastware ACH session that confirmed the erase-all
protocol (SUBs 0xA3/0x1C/0x06/0xA2) and the event deletion wire sequence.

Usage:
  python bridges/ach_mitm.py --bw-host 127.0.0.1 --bw-port 9999 --listen-port 9998
  Point the unit's call-home destination at this machine:9998.
  Point this proxy's --bw-host/port at the upstream Blastware ACH server.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 21:14:58 +00:00

178 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""
ach_mitm.py — TCP man-in-the-middle proxy for capturing Blastware ACH sessions.
The unit calls home to THIS proxy instead of directly to Blastware. The proxy
forwards every byte in both directions to the real Blastware ACH server and saves
the traffic to separate raw capture files that the Analyzer can load directly.
Setup
-----
1. Start Blastware's ACH server on the BW PC as normal (it listens on its port).
2. Run this proxy on any machine the unit can reach:
python bridges/ach_mitm.py --bw-host 192.168.1.50 --bw-port 9999
3. Point the unit's ACEmanager call-home destination to THIS machine's IP and
the --listen-port (default 9999).
4. Trigger a call-home (or wait for the unit to call in).
5. The proxy transparently forwards everything and saves two files per session:
ach_mitm_<ts>/raw_bw_<ts>.bin -- bytes Blastware sent to unit (BW TX)
ach_mitm_<ts>/raw_s3_<ts>.bin -- bytes unit sent to Blastware (S3 TX)
Both files load directly in the Analyzer (File > Open Capture).
The proxy exits cleanly when either side drops the connection.
Use case: capturing Blastware operations we haven't reverse-engineered yet,
e.g. event deletion, factory reset, firmware update.
"""
from __future__ import annotations
import argparse
import datetime
import logging
import socket
import sys
import threading
from pathlib import Path
log = logging.getLogger("ach_mitm")
def _pipe(src: socket.socket, dst: socket.socket, label: str, outfile) -> None:
"""Forward bytes from src to dst, writing everything to outfile."""
try:
while True:
data = src.recv(4096)
if not data:
break
dst.sendall(data)
outfile.write(data)
outfile.flush()
log.debug("%s %d bytes", label, len(data))
except OSError:
pass
finally:
log.info("%s pipe closed", label)
# Signal the other direction to stop by shutting down our end.
try:
dst.shutdown(socket.SHUT_WR)
except OSError:
pass
def handle(unit_sock: socket.socket, peer: str, bw_host: str, bw_port: int,
output_dir: Path) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = output_dir / f"ach_mitm_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log.info("Session %s unit=%s forwarding to %s:%d", ts, peer, bw_host, bw_port)
# Connect upstream to Blastware.
bw_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
bw_sock.connect((bw_host, bw_port))
except OSError as exc:
log.error("Cannot reach Blastware at %s:%d: %s", bw_host, bw_port, exc)
unit_sock.close()
return
log.info("Connected to Blastware at %s:%d", bw_host, bw_port)
bw_path = session_dir / f"raw_bw_{ts}.bin" # Blastware → unit (BW TX)
s3_path = session_dir / f"raw_s3_{ts}.bin" # unit → Blastware (S3 TX)
with open(bw_path, "wb") as bw_fh, open(s3_path, "wb") as s3_fh:
# Two threads: one per direction.
t_bw = threading.Thread(
target=_pipe, args=(bw_sock, unit_sock, "BW->unit", bw_fh), daemon=True
)
t_s3 = threading.Thread(
target=_pipe, args=(unit_sock, bw_sock, "unit->BW", s3_fh), daemon=True
)
t_bw.start()
t_s3.start()
t_bw.join()
t_s3.join()
bw_bytes = bw_path.stat().st_size
s3_bytes = s3_path.stat().st_size
log.info(
"Session %s done BW->unit: %d bytes unit->BW: %d bytes -> %s",
ts, bw_bytes, s3_bytes, session_dir,
)
unit_sock.close()
bw_sock.close()
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", args.listen_port))
server.listen(5)
server.settimeout(1.0)
print(f"\n{'='*60}")
print(f" ACH MITM proxy")
print(f" Listening on 0.0.0.0:{args.listen_port}")
print(f" Forwarding to {args.bw_host}:{args.bw_port}")
print(f" Captures in {output_dir.resolve()}/ach_mitm_<ts>/")
print(f"{'='*60}")
print(f"\n Point the unit's ACEmanager call-home to this machine on port {args.listen_port}")
print(f" Ctrl-C to stop\n")
try:
while True:
try:
client_sock, addr = server.accept()
except socket.timeout:
continue
peer = f"{addr[0]}:{addr[1]}"
log.info("Accepted connection from %s", peer)
t = threading.Thread(
target=handle,
args=(client_sock, peer, args.bw_host, args.bw_port, output_dir),
daemon=True,
)
t.start()
except KeyboardInterrupt:
print("\nStopping.")
finally:
server.close()
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--bw-host", required=True,
help="IP or hostname of the Blastware ACH server")
ap.add_argument("--bw-port", type=int, default=9999,
help="Port Blastware is listening on (default: 9999)")
ap.add_argument("--listen-port", type=int, default=9999,
help="Port this proxy listens on (default: 9999)")
ap.add_argument("--output", default="bridges/captures/mitm",
help="Directory for capture files")
ap.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"])
args = ap.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
stream=sys.stdout,
)
serve(args)
if __name__ == "__main__":
main()