feat: defer session dir creation and add --allow-ip allowlist

- Session directory and log file are now created ONLY after startup() succeeds.
  Internet scanners and dropped connections no longer litter the output folder.
  Raw bytes are buffered in memory until startup succeeds, then flushed to disk.

- Add --allow-ip IP flag (repeatable) to allowlist specific source IPs.
  Connections from un-listed IPs are rejected immediately (socket closed, no log).
  If no --allow-ip flags are given, all IPs are still accepted (original behavior).
  Usage: --allow-ip 63.43.212.232 --allow-ip 152.1.2.3

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-10 01:17:30 -04:00
committed by serversdown
parent 41a14ca468
commit cba8b1b401
+71 -33
View File
@@ -129,67 +129,78 @@ class AchSession:
def run(self) -> None: def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_path = session_dir / f"raw_rx_{ts}.bin"
# Wire up a file handler so every protocol log line goes to the session log
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
# Session dir and file handler are created lazily — only after startup
# succeeds. This prevents internet scanners and dropped connections from
# littering the output directory with empty session folders.
try: try:
self._run_inner(session_dir, raw_path, ts) self._run_inner(ts)
except Exception as exc: except Exception as exc:
log.error("Session failed: %s", exc, exc_info=True) log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
finally: finally:
root_logger.removeHandler(fh)
fh.close()
try: try:
self.sock.close() self.sock.close()
except Exception: except Exception:
pass pass
def _run_inner(self, session_dir: Path, raw_path: Path, ts: str) -> None: def _run_inner(self, ts: str) -> None:
log.info("="*60)
log.info("Inbound connection from %s", self.peer)
log.info("Session dir: %s", session_dir)
transport = SocketTransport(self.sock, peer=self.peer) transport = SocketTransport(self.sock, peer=self.peer)
# Tap the transport: save every raw byte received from the device. # Collect raw bytes in memory until startup succeeds, then flush to disk.
raw_fh = open(raw_path, "wb") raw_buf: list[bytes] = []
_orig_read = transport.read _orig_read = transport.read
def tapped_read(n: int) -> bytes: def tapped_read(n: int) -> bytes:
data = _orig_read(n) data = _orig_read(n)
if data: if data:
raw_fh.write(data) raw_buf.append(data)
raw_fh.flush()
return data return data
transport.read = tapped_read # type: ignore[method-assign] transport.read = tapped_read # type: ignore[method-assign]
serial: Optional[str] = None serial: Optional[str] = None
try: # ── Step 1: startup handshake ─────────────────────────────────────────
client = MiniMateClient(transport=transport, timeout=self.timeout) # Do this BEFORE creating the session directory so that scanner probes
client.open() # and dropped connections leave no trace on disk.
# ── Step 1: startup handshake ─────────────────────────────────────
log.info("Step 1/3: startup handshake (POLL / SUB 5B)")
try: try:
from minimateplus.protocol import MiniMateProtocol from minimateplus.protocol import MiniMateProtocol
client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open()
proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup() proto.startup()
log.info(" [OK] Startup OK -- pull protocol confirmed")
except Exception as exc: except Exception as exc:
log.error(" [FAIL] Startup failed: %s", exc) log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
return return # no session dir created
# Startup succeeded — this is a real unit. Create session dir now.
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_path = session_dir / f"raw_rx_{ts}.bin"
# Flush buffered raw bytes to file and switch to direct file writes.
raw_fh = open(raw_path, "wb")
for chunk in raw_buf:
raw_fh.write(chunk)
raw_buf.clear()
def tapped_read_file(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_fh.write(data)
raw_fh.flush()
return data
transport.read = tapped_read_file # type: ignore[method-assign]
# Wire up file handler now that the session dir exists.
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
try:
# ── Step 2: device info ─────────────────────────────────────────── # ── Step 2: device info ───────────────────────────────────────────
device_info = None device_info = None
if not self.events_only: if not self.events_only:
@@ -292,6 +303,8 @@ class AchSession:
finally: finally:
raw_fh.close() raw_fh.close()
client.close() # closes transport / socket cleanly client.close() # closes transport / socket cleanly
root_logger.removeHandler(fh)
fh.close()
log.info("Session complete -> %s", session_dir) log.info("Session complete -> %s", session_dir)
log.info("="*60) log.info("="*60)
@@ -376,11 +389,24 @@ def serve(args: argparse.Namespace) -> None:
print(f" Remote Port: {args.port}") print(f" Remote Port: {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
allow_ips = set(args.allow_ips)
if allow_ips:
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
else:
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
try: try:
while True: while True:
try: try:
client_sock, addr = server_sock.accept() client_sock, addr = server_sock.accept()
peer_ip = addr[0]
peer = f"{addr[0]}:{addr[1]}" peer = f"{addr[0]}:{addr[1]}"
if allow_ips and peer_ip not in allow_ips:
log.info("Rejected connection from %s (not in allowlist)", peer)
client_sock.close()
continue
log.info("Accepted connection from %s", peer) log.info("Accepted connection from %s", peer)
session = AchSession( session = AchSession(
sock=client_sock, sock=client_sock,
@@ -441,6 +467,18 @@ def parse_args() -> argparse.Namespace:
"Useful if a unit has many old events stored — prevents a very long first run." "Useful if a unit has many old events stored — prevents a very long first run."
), ),
) )
p.add_argument(
"--allow-ip",
metavar="IP",
action="append",
dest="allow_ips",
default=[],
help=(
"Only accept connections from this IP address (repeat for multiple). "
"Example: --allow-ip 63.43.212.232 "
"If not specified, all IPs are accepted (not recommended for public servers)."
),
)
p.add_argument( p.add_argument(
"--verbose", "-v", "--verbose", "-v",
action="store_true", action="store_true",