From cba8b1b40129ff0f6aa1f9a1f51eff787de903d1 Mon Sep 17 00:00:00 2001 From: Brian Harrison Date: Fri, 10 Apr 2026 01:17:30 -0400 Subject: [PATCH] feat: defer session dir creation and add --allow-ip allowlist - Session directory and log file are now created ONLY after startup() succeeds. Internet scanners and dropped connections no longer litter the output folder. Raw bytes are buffered in memory until startup succeeds, then flushed to disk. - Add --allow-ip IP flag (repeatable) to allowlist specific source IPs. Connections from un-listed IPs are rejected immediately (socket closed, no log). If no --allow-ip flags are given, all IPs are still accepted (original behavior). Usage: --allow-ip 63.43.212.232 --allow-ip 152.1.2.3 Co-Authored-By: Claude Sonnet 4.6 --- bridges/ach_server.py | 106 ++++++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 34 deletions(-) diff --git a/bridges/ach_server.py b/bridges/ach_server.py index ccaff02..012db05 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -129,67 +129,78 @@ class AchSession: def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - session_dir = self.output_dir / f"ach_inbound_{ts}" - session_dir.mkdir(parents=True, exist_ok=True) - - log_path = session_dir / f"session_{ts}.log" - raw_path = session_dir / f"raw_rx_{ts}.bin" - - # Wire up a file handler so every protocol log line goes to the session log - fh = logging.FileHandler(log_path, encoding="utf-8") - fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) - root_logger = logging.getLogger() - root_logger.addHandler(fh) + # Session dir and file handler are created lazily — only after startup + # succeeds. This prevents internet scanners and dropped connections from + # littering the output directory with empty session folders. try: - self._run_inner(session_dir, raw_path, ts) + self._run_inner(ts) except Exception as exc: - log.error("Session failed: %s", exc, exc_info=True) + log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) finally: - root_logger.removeHandler(fh) - fh.close() try: self.sock.close() except Exception: pass - def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: - log.info("="*60) - log.info("Inbound connection from %s", self.peer) - log.info("Session dir: %s", session_dir) - + def _run_inner(self, ts: str) -> None: transport = SocketTransport(self.sock, peer=self.peer) - # Tap the transport: save every raw byte received from the device. - raw_fh = open(raw_path, "wb") + # Collect raw bytes in memory until startup succeeds, then flush to disk. + raw_buf: list[bytes] = [] _orig_read = transport.read def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: - raw_fh.write(data) - raw_fh.flush() + raw_buf.append(data) return data transport.read = tapped_read # type: ignore[method-assign] serial: Optional[str] = None + # ── Step 1: startup handshake ───────────────────────────────────────── + # Do this BEFORE creating the session directory so that scanner probes + # and dropped connections leave no trace on disk. try: + from minimateplus.protocol import MiniMateProtocol client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() + proto = MiniMateProtocol(transport, recv_timeout=self.timeout) + proto.startup() + except Exception as exc: + log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) + return # no session dir created - # ── Step 1: startup handshake ───────────────────────────────────── - log.info("Step 1/3: startup handshake (POLL / SUB 5B)") - try: - from minimateplus.protocol import MiniMateProtocol - proto = MiniMateProtocol(transport, recv_timeout=self.timeout) - proto.startup() - log.info(" [OK] Startup OK -- pull protocol confirmed") - except Exception as exc: - log.error(" [FAIL] Startup failed: %s", exc) - return + # Startup succeeded — this is a real unit. Create session dir now. + session_dir = self.output_dir / f"ach_inbound_{ts}" + session_dir.mkdir(parents=True, exist_ok=True) + log_path = session_dir / f"session_{ts}.log" + raw_path = session_dir / f"raw_rx_{ts}.bin" + # Flush buffered raw bytes to file and switch to direct file writes. + raw_fh = open(raw_path, "wb") + for chunk in raw_buf: + raw_fh.write(chunk) + raw_buf.clear() + + def tapped_read_file(n: int) -> bytes: + data = _orig_read(n) + if data: + raw_fh.write(data) + raw_fh.flush() + return data + + transport.read = tapped_read_file # type: ignore[method-assign] + + # Wire up file handler now that the session dir exists. + fh = logging.FileHandler(log_path, encoding="utf-8") + fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) + root_logger = logging.getLogger() + root_logger.addHandler(fh) + + try: # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: @@ -292,6 +303,8 @@ class AchSession: finally: raw_fh.close() client.close() # closes transport / socket cleanly + root_logger.removeHandler(fh) + fh.close() log.info("Session complete -> %s", session_dir) log.info("="*60) @@ -376,11 +389,24 @@ def serve(args: argparse.Namespace) -> None: print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") + allow_ips = set(args.allow_ips) + if allow_ips: + print(f" Allowlist: {', '.join(sorted(allow_ips))}") + else: + print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") + try: while True: try: client_sock, addr = server_sock.accept() + peer_ip = addr[0] peer = f"{addr[0]}:{addr[1]}" + + if allow_ips and peer_ip not in allow_ips: + log.info("Rejected connection from %s (not in allowlist)", peer) + client_sock.close() + continue + log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, @@ -441,6 +467,18 @@ def parse_args() -> argparse.Namespace: "Useful if a unit has many old events stored — prevents a very long first run." ), ) + p.add_argument( + "--allow-ip", + metavar="IP", + action="append", + dest="allow_ips", + default=[], + help=( + "Only accept connections from this IP address (repeat for multiple). " + "Example: --allow-ip 63.43.212.232 " + "If not specified, all IPs are accepted (not recommended for public servers)." + ), + ) p.add_argument( "--verbose", "-v", action="store_true",