diff --git a/bridges/ach_server.py b/bridges/ach_server.py index ccaff02..012db05 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -129,67 +129,78 @@ class AchSession: def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - session_dir = self.output_dir / f"ach_inbound_{ts}" - session_dir.mkdir(parents=True, exist_ok=True) - - log_path = session_dir / f"session_{ts}.log" - raw_path = session_dir / f"raw_rx_{ts}.bin" - - # Wire up a file handler so every protocol log line goes to the session log - fh = logging.FileHandler(log_path, encoding="utf-8") - fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) - root_logger = logging.getLogger() - root_logger.addHandler(fh) + # Session dir and file handler are created lazily — only after startup + # succeeds. This prevents internet scanners and dropped connections from + # littering the output directory with empty session folders. try: - self._run_inner(session_dir, raw_path, ts) + self._run_inner(ts) except Exception as exc: - log.error("Session failed: %s", exc, exc_info=True) + log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) finally: - root_logger.removeHandler(fh) - fh.close() try: self.sock.close() except Exception: pass - def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: - log.info("="*60) - log.info("Inbound connection from %s", self.peer) - log.info("Session dir: %s", session_dir) - + def _run_inner(self, ts: str) -> None: transport = SocketTransport(self.sock, peer=self.peer) - # Tap the transport: save every raw byte received from the device. - raw_fh = open(raw_path, "wb") + # Collect raw bytes in memory until startup succeeds, then flush to disk. + raw_buf: list[bytes] = [] _orig_read = transport.read def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: - raw_fh.write(data) - raw_fh.flush() + raw_buf.append(data) return data transport.read = tapped_read # type: ignore[method-assign] serial: Optional[str] = None + # ── Step 1: startup handshake ───────────────────────────────────────── + # Do this BEFORE creating the session directory so that scanner probes + # and dropped connections leave no trace on disk. try: + from minimateplus.protocol import MiniMateProtocol client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() + proto = MiniMateProtocol(transport, recv_timeout=self.timeout) + proto.startup() + except Exception as exc: + log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) + return # no session dir created - # ── Step 1: startup handshake ───────────────────────────────────── - log.info("Step 1/3: startup handshake (POLL / SUB 5B)") - try: - from minimateplus.protocol import MiniMateProtocol - proto = MiniMateProtocol(transport, recv_timeout=self.timeout) - proto.startup() - log.info(" [OK] Startup OK -- pull protocol confirmed") - except Exception as exc: - log.error(" [FAIL] Startup failed: %s", exc) - return + # Startup succeeded — this is a real unit. Create session dir now. + session_dir = self.output_dir / f"ach_inbound_{ts}" + session_dir.mkdir(parents=True, exist_ok=True) + log_path = session_dir / f"session_{ts}.log" + raw_path = session_dir / f"raw_rx_{ts}.bin" + # Flush buffered raw bytes to file and switch to direct file writes. + raw_fh = open(raw_path, "wb") + for chunk in raw_buf: + raw_fh.write(chunk) + raw_buf.clear() + + def tapped_read_file(n: int) -> bytes: + data = _orig_read(n) + if data: + raw_fh.write(data) + raw_fh.flush() + return data + + transport.read = tapped_read_file # type: ignore[method-assign] + + # Wire up file handler now that the session dir exists. + fh = logging.FileHandler(log_path, encoding="utf-8") + fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) + root_logger = logging.getLogger() + root_logger.addHandler(fh) + + try: # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: @@ -292,6 +303,8 @@ class AchSession: finally: raw_fh.close() client.close() # closes transport / socket cleanly + root_logger.removeHandler(fh) + fh.close() log.info("Session complete -> %s", session_dir) log.info("="*60) @@ -376,11 +389,24 @@ def serve(args: argparse.Namespace) -> None: print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") + allow_ips = set(args.allow_ips) + if allow_ips: + print(f" Allowlist: {', '.join(sorted(allow_ips))}") + else: + print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") + try: while True: try: client_sock, addr = server_sock.accept() + peer_ip = addr[0] peer = f"{addr[0]}:{addr[1]}" + + if allow_ips and peer_ip not in allow_ips: + log.info("Rejected connection from %s (not in allowlist)", peer) + client_sock.close() + continue + log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, @@ -441,6 +467,18 @@ def parse_args() -> argparse.Namespace: "Useful if a unit has many old events stored — prevents a very long first run." ), ) + p.add_argument( + "--allow-ip", + metavar="IP", + action="append", + dest="allow_ips", + default=[], + help=( + "Only accept connections from this IP address (repeat for multiple). " + "Example: --allow-ip 63.43.212.232 " + "If not specified, all IPs are accepted (not recommended for public servers)." + ), + ) p.add_argument( "--verbose", "-v", action="store_true",