From 7070b948a809137c2811e30021fffc1a2ee80c2b Mon Sep 17 00:00:00 2001 From: serversdwn Date: Tue, 10 Feb 2026 07:07:34 +0000 Subject: [PATCH] add: stress test script for diagnosing TCP connection issues. chore: clean up .gitignore --- .gitignore | 1 + SLM-stress-test/nl43_stress_test.py | 1483 +++++++++++++++++++++++++++ 2 files changed, 1484 insertions(+) create mode 100644 SLM-stress-test/nl43_stress_test.py diff --git a/.gitignore b/.gitignore index a6a7a27..ce62098 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /manuals/ /data/ +/SLM-stress-test/stress_test_logs/ # Python cache __pycache__/ diff --git a/SLM-stress-test/nl43_stress_test.py b/SLM-stress-test/nl43_stress_test.py new file mode 100644 index 0000000..64715f2 --- /dev/null +++ b/SLM-stress-test/nl43_stress_test.py @@ -0,0 +1,1483 @@ +#!/usr/bin/env python3 +""" +NL-43 TCP Wedge Stress Test Tool + +Standalone diagnostic tool for determining what causes the NL-43 sound level +meter's TCP port to become "wedged" (connection refused). Communicates directly +with the device over raw TCP sockets - zero SLMM dependencies. + +Usage: + python nl43_stress_test.py --host 192.168.1.100 --port 2255 --phase 1 + python nl43_stress_test.py --host 192.168.1.100 --phase all --dry-run + +Test Phases: + 1 - Baseline Connection Count: How many commands before it dies? + 2 - Rate Variation: Does spacing between commands matter? + 3 - Command Variety: Do certain commands wedge faster? + 4 - Connection Duration: Does holding connections open matter? + 5 - Sustained Soak: Simulate real SLMM polling over hours +""" + +import asyncio +import argparse +import csv +import os +import shutil +import signal +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from statistics import mean, median, stdev + + +# ─── Constants ─────────────────────────────────────────────────────────────── + +DEFAULT_PORT = 2255 +DEFAULT_TIMEOUT = 5.0 +CRLF = b"\r\n" +SUB_BYTE = b"\x1a" + +# NL-43 result codes +RESULT_OK = "R+0000" +RESULT_CMD_ERROR = "R+0001" +RESULT_PARAM_ERROR = "R+0002" +RESULT_SPEC_ERROR = "R+0003" +RESULT_STATUS_ERROR = "R+0004" + + +# ─── Global state ──────────────────────────────────────────────────────────── + +_abort_requested = False + + +def _signal_handler(sig, frame): + global _abort_requested + _abort_requested = True + print("\n\n!!! Ctrl+C detected - finishing current command and writing summary...\n") + + +signal.signal(signal.SIGINT, _signal_handler) + + +# ─── Packet Capture ────────────────────────────────────────────────────────── + +class PacketCapture: + """ + Manages a tcpdump subprocess that captures all traffic to/from the NL-43. + Saves a .pcap file in the run directory for Wireshark analysis. + """ + + def __init__(self, host: str, port: int, pcap_path: Path): + self.host = host + self.port = port + self.pcap_path = pcap_path + self.process: subprocess.Popen | None = None + self.available = False + + def _try_start_tcpdump(self, cmd: list[str], label: str) -> bool: + """Attempt to start tcpdump with the given command. Returns True if successful.""" + try: + self.process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + # Give it a moment to start or fail + time.sleep(1.0) + + if self.process.poll() is not None: + # Process exited immediately + stderr = self.process.stderr.read().decode(errors="ignore") + print(f"[PCAP] {label} tcpdump exited immediately: {stderr.strip()}") + self.process = None + return False + + # Process is running - check stderr for warnings (non-blocking read) + # tcpdump writes "listening on ..." to stderr on success + import select + ready, _, _ = select.select([self.process.stderr], [], [], 0.5) + if ready: + # Read available stderr without blocking + import os + stderr_bytes = os.read(self.process.stderr.fileno(), 4096) + stderr_text = stderr_bytes.decode(errors="ignore") + if "listening on" in stderr_text.lower(): + print(f"[PCAP] {label} tcpdump started: {stderr_text.strip()}") + self.available = True + return True + elif "permission" in stderr_text.lower() or "operation not permitted" in stderr_text.lower(): + print(f"[PCAP] {label} tcpdump permission denied: {stderr_text.strip()}") + self.process.terminate() + self.process.wait(timeout=3) + self.process = None + return False + else: + # Some other output - might be OK, might not + print(f"[PCAP] {label} tcpdump stderr: {stderr_text.strip()}") + + # Process is still running - assume OK + self.available = True + return True + + except FileNotFoundError: + print(f"[PCAP] {label} tcpdump binary not found") + self.process = None + return False + except Exception as e: + print(f"[PCAP] {label} failed to start: {e}") + if self.process: + try: + self.process.kill() + self.process.wait(timeout=3) + except Exception: + pass + self.process = None + return False + + def start(self) -> bool: + """Start tcpdump capture. Returns True if started successfully.""" + if not shutil.which("tcpdump"): + print("[PCAP] tcpdump not found in PATH - packet capture disabled") + print("[PCAP] Install with: sudo apt install tcpdump") + return False + + # Capture all TCP traffic to/from the device on the specified port + # -i any: all interfaces + # -nn: no name resolution + # -s 0: full packets, no truncation + # -w: write raw pcap to file + # -U: packet-buffered output (flush after each packet) + cmd = [ + "tcpdump", + "-i", "any", + "-nn", + "-s", "0", + "-U", + "-w", str(self.pcap_path), + f"host {self.host} and tcp port {self.port}", + ] + + # Try with sudo first (tcpdump almost always needs root for raw capture) + print("[PCAP] Starting packet capture (requires sudo for raw interface access)...") + if self._try_start_tcpdump(["sudo", "-n"] + cmd, "sudo"): + print(f"[PCAP] ✓ Capturing to: {self.pcap_path}") + print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}") + return True + + # Fallback: try without sudo (works if user has CAP_NET_RAW capability) + print("[PCAP] sudo failed, trying without sudo...") + if self._try_start_tcpdump(cmd, "non-sudo"): + print(f"[PCAP] ✓ Capturing to: {self.pcap_path}") + print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}") + return True + + print("[PCAP] ✗ Could not start packet capture") + print("[PCAP] To enable, run the stress test with sudo:") + print(f"[PCAP] sudo python3 nl43_stress_test.py --host {self.host} --port {self.port} ...") + print("[PCAP] Or run tcpdump separately in another terminal:") + print(f"[PCAP] sudo tcpdump -i any -nn -s 0 -U -w capture.pcap host {self.host} and tcp port {self.port}") + return False + + def stop(self) -> dict: + """Stop tcpdump and return capture stats.""" + stats = { + "pcap_file": str(self.pcap_path), + "packets_captured": 0, + "file_size_bytes": 0, + } + + if not self.process: + return stats + + try: + # SIGTERM for graceful shutdown (tcpdump prints stats on exit) + self.process.terminate() + try: + _, stderr = self.process.communicate(timeout=5) + stderr_text = stderr.decode(errors="ignore") + for line in stderr_text.split("\n"): + line = line.strip() + if not line: + continue + if "packets captured" in line: + try: + stats["packets_captured"] = int(line.split()[0]) + except (ValueError, IndexError): + pass + # Print all tcpdump stats lines + if any(k in line for k in ["captured", "received", "dropped"]): + print(f"[PCAP] {line}") + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + except Exception as e: + print(f"[PCAP] Error stopping tcpdump: {e}") + + # Report file size + if self.pcap_path.exists(): + stats["file_size_bytes"] = self.pcap_path.stat().st_size + size_mb = stats["file_size_bytes"] / (1024 * 1024) + print(f"[PCAP] Saved: {self.pcap_path} ({size_mb:.2f} MB)") + print(f"[PCAP] Open with: wireshark {self.pcap_path}") + print(f"[PCAP] or: tcpdump -nn -r {self.pcap_path}") + else: + print(f"[PCAP] Warning: pcap file not found at {self.pcap_path}") + + self.process = None + return stats + + +# ─── Logger ────────────────────────────────────────────────────────────────── + +class StressTestLogger: + """Dual-output logger: console + file, with CSV data writers.""" + + def __init__(self, run_dir: Path): + self.run_dir = run_dir + self.run_dir.mkdir(parents=True, exist_ok=True) + + self.log_file = open(run_dir / "full_log.txt", "w", buffering=1) + self.summary_file = open(run_dir / "summary.txt", "w", buffering=1) + + # Raw wire log - every byte sent/received in hex + ascii + self.wire_log = open(run_dir / "wire_log.txt", "w", buffering=1) + + # CSV for timing data + self.timing_csv_path = run_dir / "timing_data.csv" + self.timing_csv = open(self.timing_csv_path, "w", newline="") + self.timing_writer = csv.writer(self.timing_csv) + self.timing_writer.writerow([ + "phase", "cmd_num", "command", "connect_ms", "response_ms", + "total_ms", "success", "error_type", "result_code", "data_length", + "local_port", "timestamp" + ]) + + def log(self, message: str, also_print: bool = True): + ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + line = f"[{ts}] {message}" + self.log_file.write(line + "\n") + if also_print: + print(line) + + def wire(self, direction: str, raw_bytes: bytes, label: str = ""): + """Log raw wire data - every byte in hex and ascii representation.""" + ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + hex_str = raw_bytes.hex(" ") + ascii_str = raw_bytes.decode("ascii", errors="replace") + # Show control characters explicitly + readable = "" + for b in raw_bytes: + if b == 0x0d: + readable += "\\r" + elif b == 0x0a: + readable += "\\n" + elif b == 0x1a: + readable += "\\x1a[SUB]" + elif b == 0x24: + readable += "$" + elif 32 <= b < 127: + readable += chr(b) + else: + readable += f"\\x{b:02x}" + self.wire_log.write(f"[{ts}] {label} {direction} ({len(raw_bytes)} bytes)\n") + self.wire_log.write(f" HEX: {hex_str}\n") + self.wire_log.write(f" ASCII: {readable}\n") + self.wire_log.flush() + + def summary(self, message: str): + self.summary_file.write(message + "\n") + self.log_file.write(message + "\n") + print(message) + + def record_timing(self, phase: str, cmd_num: int, command: str, + connect_ms: float, response_ms: float, total_ms: float, + success: bool, error_type: str = "", result_code: str = "", + data_length: int = 0, local_port: int = 0): + self.timing_writer.writerow([ + phase, cmd_num, command, f"{connect_ms:.1f}", f"{response_ms:.1f}", + f"{total_ms:.1f}", success, error_type, result_code, data_length, + local_port, + datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + ]) + self.timing_csv.flush() + + def close(self): + self.log_file.close() + self.summary_file.close() + self.timing_csv.close() + self.wire_log.close() + + +# ─── Raw TCP Communication ────────────────────────────────────────────────── + +async def send_command(host: str, port: int, command: str, timeout: float, + is_query: bool = True, hold_seconds: float = 0, + logger: "StressTestLogger | None" = None, + label: str = "") -> dict: + """ + Send a single command to the NL-43 over a fresh TCP connection. + + Returns dict with timing and result info: + connect_ms, response_ms, total_ms, success, result_code, data, + data_length, raw_result_bytes, raw_data_bytes, local_port, + error, error_type + """ + result = { + "connect_ms": 0.0, + "response_ms": 0.0, + "total_ms": 0.0, + "success": False, + "result_code": None, + "data": None, + "data_length": 0, + "raw_result_bytes": None, + "raw_data_bytes": None, + "local_port": 0, + "error": None, + "error_type": None, + } + + t_start = time.monotonic() + reader = None + writer = None + + try: + # Connect + t_conn_start = time.monotonic() + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), + timeout=timeout + ) + result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000 + + # Grab local socket info (source IP:port) + sockname = writer.get_extra_info("sockname") + if sockname: + result["local_port"] = sockname[1] + + if logger: + logger.log(f"{label} CONNECT {host}:{port} from :{result['local_port']} " + f"in {result['connect_ms']:.1f}ms", also_print=False) + + # Send command + cmd_bytes = command.encode("ascii") + if not cmd_bytes.endswith(CRLF): + cmd_bytes += CRLF + writer.write(cmd_bytes) + await writer.drain() + + if logger: + logger.wire("SEND>>>", cmd_bytes, label) + + # Read result code (first line) + t_resp_start = time.monotonic() + first_line_raw = await asyncio.wait_for( + reader.readuntil(b"\n"), + timeout=timeout + ) + result["raw_result_bytes"] = first_line_raw + + if logger: + logger.wire("<< 0: + if logger: + logger.log(f"{label} HOLDING connection open for {hold_seconds}s...", + also_print=False) + await asyncio.sleep(hold_seconds) + + result["success"] = result_code == RESULT_OK + + if result_code != RESULT_OK: + result["error"] = f"Device returned {result_code}" + result["error_type"] = "DeviceError" + + except ConnectionRefusedError: + result["error"] = "Connection refused" + result["error_type"] = "ConnectionRefused" + if logger: + logger.log(f"{label} CONNECTION REFUSED by {host}:{port}", also_print=False) + except asyncio.TimeoutError: + result["error"] = f"Timeout after {timeout}s" + result["error_type"] = "Timeout" + if logger: + logger.log(f"{label} TIMEOUT after {timeout}s connecting to {host}:{port}", + also_print=False) + except ConnectionResetError as e: + result["error"] = f"Connection reset: {e}" + result["error_type"] = "ConnectionReset" + if logger: + logger.log(f"{label} CONNECTION RESET by {host}:{port}: {e}", also_print=False) + except OSError as e: + result["error"] = f"{type(e).__name__}: {e}" + result["error_type"] = type(e).__name__ + if logger: + logger.log(f"{label} OS ERROR: {type(e).__name__}: {e}", also_print=False) + except Exception as e: + result["error"] = f"{type(e).__name__}: {e}" + result["error_type"] = type(e).__name__ + if logger: + logger.log(f"{label} UNEXPECTED ERROR: {type(e).__name__}: {e}", also_print=False) + finally: + if writer: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + result["total_ms"] = (time.monotonic() - t_start) * 1000 + + if logger: + logger.log(f"{label} CLOSE total={result['total_ms']:.1f}ms " + f"local_port={result['local_port']}", also_print=False) + + return result + + +async def stream_drd(host: str, port: int, duration_seconds: float, + timeout: float, + logger: "StressTestLogger | None" = None) -> dict: + """ + Open a DRD? streaming connection and hold it for the specified duration. + + Returns dict with timing and streaming stats. + """ + result = { + "connect_ms": 0.0, + "total_ms": 0.0, + "success": False, + "lines_received": 0, + "error": None, + "error_type": None, + "clean_shutdown": False, + } + + t_start = time.monotonic() + writer = None + + try: + t_conn_start = time.monotonic() + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), + timeout=timeout + ) + result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000 + + # Grab local port + sockname = writer.get_extra_info("sockname") + local_port = sockname[1] if sockname else 0 + result["local_port"] = local_port + + if logger: + logger.log(f"DRD CONNECT :{local_port} → {host}:{port} " + f"in {result['connect_ms']:.1f}ms", also_print=False) + + # Send DRD? + cmd_bytes = b"DRD?\r\n" + writer.write(cmd_bytes) + await writer.drain() + + if logger: + logger.wire("SEND>>>", cmd_bytes, "DRD") + + # Read result code + first_line = await asyncio.wait_for( + reader.readuntil(b"\n"), + timeout=timeout + ) + + if logger: + logger.wire("<<>>", SUB_BYTE, "DRD shutdown") + except Exception: + pass + + result["success"] = True + + except ConnectionRefusedError: + result["error"] = "Connection refused" + result["error_type"] = "ConnectionRefused" + except asyncio.TimeoutError: + result["error"] = f"Connect timeout after {timeout}s" + result["error_type"] = "Timeout" + except Exception as e: + result["error"] = str(e) + result["error_type"] = type(e).__name__ + finally: + if writer: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + result["total_ms"] = (time.monotonic() - t_start) * 1000 + + return result + + +# ─── Health Check ──────────────────────────────────────────────────────────── + +async def health_check(host: str, port: int, timeout: float, + logger: StressTestLogger, label: str = "") -> bool: + """ + Quick connectivity test. Returns True if device responds to DOD?. + """ + prefix = f"HEALTH [{label}]" if label else "HEALTH" + logger.log(f"{prefix} Checking device at {host}:{port}...") + + r = await send_command(host, port, "DOD?", timeout, is_query=True, + logger=logger, label=prefix) + + if r["success"]: + logger.log(f"{prefix} OK - connect={r['connect_ms']:.0f}ms " + f"response={r['response_ms']:.0f}ms " + f"local_port={r['local_port']} " + f"data_len={r['data_length']}") + return True + else: + logger.log(f"{prefix} FAIL - {r['error_type']}: {r['error']}") + return False + + +async def wait_for_recovery(host: str, port: int, timeout: float, + logger: StressTestLogger, max_wait: int = 300, + label: str = "") -> dict: + """ + Wait for the device to recover after a wedge. + Tries every 10 seconds for up to max_wait seconds. + + Returns dict with recovery info. + """ + prefix = f"RECOVERY [{label}]" if label else "RECOVERY" + logger.log(f"{prefix} Device unresponsive. Monitoring for recovery (max {max_wait}s)...") + + t_start = time.monotonic() + attempts = 0 + + while (time.monotonic() - t_start) < max_wait and not _abort_requested: + attempts += 1 + elapsed = time.monotonic() - t_start + logger.log(f"{prefix} Attempt #{attempts} at {elapsed:.0f}s...") + + r = await send_command(host, port, "DOD?", timeout, is_query=True, + logger=logger, label=f"{prefix} attempt#{attempts}") + + if r["success"]: + recovery_time = time.monotonic() - t_start + logger.log(f"{prefix} *** DEVICE RECOVERED after {recovery_time:.1f}s ({attempts} attempts) ***") + return { + "recovered": True, + "recovery_seconds": recovery_time, + "attempts": attempts, + } + + logger.log(f"{prefix} Still down: {r['error_type']}: {r['error']}") + await asyncio.sleep(10) + + total_time = time.monotonic() - t_start + logger.log(f"{prefix} Device did NOT recover within {total_time:.0f}s ({attempts} attempts)") + return { + "recovered": False, + "recovery_seconds": total_time, + "attempts": attempts, + } + + +# ─── Phase Summaries ──────────────────────────────────────────────────────── + +def compute_phase_summary(phase_name: str, timings: list, logger: StressTestLogger, + extra_info: str = ""): + """Compute and log summary statistics for a phase.""" + total = len(timings) + successes = [t for t in timings if t["success"]] + failures = [t for t in timings if not t["success"]] + + logger.summary(f"\n{'=' * 60}") + logger.summary(f"=== {phase_name} SUMMARY ===") + logger.summary(f"{'=' * 60}") + logger.summary(f"Total commands sent: {total}") + logger.summary(f"Total successful: {len(successes)}") + logger.summary(f"Total failed: {len(failures)}") + + if failures: + error_types = {} + for f in failures: + et = f.get("error_type", "Unknown") + error_types[et] = error_types.get(et, 0) + 1 + for et, count in error_types.items(): + logger.summary(f" {et}: {count}") + + if successes: + connect_times = [t["connect_ms"] for t in successes] + response_times = [t["response_ms"] for t in successes] + total_times = [t["total_ms"] for t in successes] + + logger.summary(f"\nConnect time (ms): avg={mean(connect_times):.1f} " + f"med={median(connect_times):.1f} " + f"min={min(connect_times):.1f} max={max(connect_times):.1f}") + logger.summary(f"Response time (ms): avg={mean(response_times):.1f} " + f"med={median(response_times):.1f} " + f"min={min(response_times):.1f} max={max(response_times):.1f}") + logger.summary(f"Total time (ms): avg={mean(total_times):.1f} " + f"med={median(total_times):.1f} " + f"min={min(total_times):.1f} max={max(total_times):.1f}") + + if len(connect_times) > 1: + logger.summary(f"Connect stdev (ms): {stdev(connect_times):.1f}") + logger.summary(f"Response stdev (ms): {stdev(response_times):.1f}") + + # Trend analysis - compare first 20% vs last 20% + if len(response_times) >= 10: + bucket = max(1, len(response_times) // 5) + first_bucket = response_times[:bucket] + last_bucket = response_times[-bucket:] + first_avg = mean(first_bucket) + last_avg = mean(last_bucket) + change_pct = ((last_avg - first_avg) / first_avg) * 100 if first_avg > 0 else 0 + + if change_pct > 20: + trend = "INCREASING" + elif change_pct < -20: + trend = "DECREASING" + else: + trend = "STABLE" + + logger.summary(f"\nResponse time trend: {trend} " + f"(first {bucket}: {first_avg:.1f}ms → last {bucket}: {last_avg:.1f}ms, " + f"{change_pct:+.1f}%)") + + if failures: + # Time/command number of first failure + first_fail = failures[0] + fail_idx = timings.index(first_fail) + elapsed_at_fail = sum(t["total_ms"] for t in timings[:fail_idx + 1]) / 1000 + logger.summary(f"\nFirst failure at: command #{fail_idx + 1} " + f"after {elapsed_at_fail:.1f}s ({elapsed_at_fail / 60:.1f} min)") + logger.summary(f"Failure type: {first_fail['error_type']}: {first_fail['error']}") + + if extra_info: + logger.summary(f"\n{extra_info}") + + logger.summary(f"{'=' * 60}\n") + + +# ─── Test Phases ───────────────────────────────────────────────────────────── + +async def phase_1_baseline(host: str, port: int, timeout: float, + max_commands: int, logger: StressTestLogger) -> list: + """ + Phase 1: Baseline Connection Count + How many DOD? commands at 1-second spacing before the device wedges? + """ + logger.summary(f"\n{'#' * 60}") + logger.summary("### PHASE 1: Baseline Connection Count ###") + logger.summary(f"### Sending up to {max_commands} DOD? commands at 1.0s spacing ###") + logger.summary(f"{'#' * 60}\n") + + timings = [] + wedge_detected = False + phase_start = time.monotonic() + + for i in range(max_commands): + if _abort_requested: + logger.log("PHASE1 Abort requested by user") + break + + cmd_num = i + 1 + elapsed_total = time.monotonic() - phase_start + + cmd_label = f"PHASE1 CMD#{cmd_num:04d}" + r = await send_command(host, port, "DOD?", timeout, is_query=True, + logger=logger, label=cmd_label) + + # Log every command with FULL data + if r["success"]: + logger.log(f"PHASE1 CMD#{cmd_num:04d} OK " + f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " + f"total={r['total_ms']:.0f}ms elapsed={elapsed_total:.1f}s " + f"port=:{r['local_port']} code={r['result_code']} " + f"data_len={r['data_length']}\n" + f" FULL DATA: {r['data']}") + else: + logger.log(f"PHASE1 CMD#{cmd_num:04d} FAIL " + f"{r['error_type']}: {r['error']} " + f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms " + f"elapsed={elapsed_total:.1f}s port=:{r['local_port']}") + + logger.record_timing("phase1", cmd_num, "DOD?", + r["connect_ms"], r["response_ms"], r["total_ms"], + r["success"], r.get("error_type", ""), + r.get("result_code", ""), r.get("data_length", 0), + r.get("local_port", 0)) + + timings.append(r) + + # Check for wedge + if r["error_type"] == "ConnectionRefused": + logger.log(f"PHASE1 *** WEDGE DETECTED at command #{cmd_num} " + f"after {elapsed_total:.1f}s ({elapsed_total / 60:.1f} min) ***") + wedge_detected = True + break + + # Wait 1 second before next command + if cmd_num < max_commands: + await asyncio.sleep(1.0) + + extra = "" + if not wedge_detected and not _abort_requested: + extra = f"Device survived all {max_commands} commands without wedging." + elif wedge_detected: + recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE1") + if recovery["recovered"]: + extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s" + else: + extra = "Device did NOT recover within monitoring window" + + compute_phase_summary("PHASE 1: Baseline Connection Count", timings, logger, extra) + return timings + + +async def phase_2_rate_variation(host: str, port: int, timeout: float, + commands_per_round: int, pause_between: int, + logger: StressTestLogger) -> list: + """ + Phase 2: Rate Variation + Does the spacing between commands affect when/if the device wedges? + """ + rates = [1.0, 2.0, 5.0, 10.0, 30.0] + + logger.summary(f"\n{'#' * 60}") + logger.summary("### PHASE 2: Rate Variation ###") + logger.summary(f"### {commands_per_round} commands per round at rates: {rates}s ###") + logger.summary(f"{'#' * 60}\n") + + all_timings = [] + + for rate in rates: + if _abort_requested: + break + + # Health check before each round + logger.log(f"PHASE2 Checking device health before {rate}s rate round...") + healthy = await health_check(host, port, timeout, logger, label=f"PHASE2-{rate}s") + + if not healthy: + logger.log(f"PHASE2 Device unhealthy before {rate}s round - attempting recovery") + recovery = await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE2-{rate}s") + if not recovery["recovered"]: + logger.log(f"PHASE2 Device not recovered - skipping remaining rates") + break + + logger.log(f"\nPHASE2 === Starting {rate}s rate round ({commands_per_round} commands) ===") + round_timings = [] + wedge_in_round = False + + for i in range(commands_per_round): + if _abort_requested: + break + + cmd_num = i + 1 + + cmd_label = f"PHASE2 [{rate}s] CMD#{cmd_num:03d}" + r = await send_command(host, port, "DOD?", timeout, is_query=True, + logger=logger, label=cmd_label) + + status = "OK" if r["success"] else f"FAIL:{r['error_type']}" + logger.log(f"PHASE2 [{rate}s] CMD#{cmd_num:03d} {status} " + f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " + f"total={r['total_ms']:.0f}ms port=:{r['local_port']} " + f"data_len={r['data_length']}\n" + f" FULL DATA: {r['data']}") + + logger.record_timing(f"phase2_{rate}s", cmd_num, "DOD?", + r["connect_ms"], r["response_ms"], r["total_ms"], + r["success"], r.get("error_type", ""), + r.get("result_code", ""), r.get("data_length", 0), + r.get("local_port", 0)) + + round_timings.append(r) + + if r["error_type"] == "ConnectionRefused": + logger.log(f"PHASE2 *** WEDGE at {rate}s rate, command #{cmd_num} ***") + wedge_in_round = True + break + + if cmd_num < commands_per_round: + await asyncio.sleep(rate) + + compute_phase_summary(f"PHASE 2: Rate {rate}s ({commands_per_round} commands)", + round_timings, logger) + all_timings.extend(round_timings) + + if wedge_in_round: + recovery = await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE2-{rate}s-recovery") + + # Pause between rounds + if rate != rates[-1] and not _abort_requested: + logger.log(f"PHASE2 Pausing {pause_between}s between rounds...") + for _ in range(pause_between): + if _abort_requested: + break + await asyncio.sleep(1) + + return all_timings + + +async def phase_3_command_variety(host: str, port: int, timeout: float, + commands_per_type: int, pause_between: int, + logger: StressTestLogger) -> list: + """ + Phase 3: Command Variety + Do certain commands cause the device to wedge faster? + """ + test_commands = [ + ("DOD?", True), + ("Measure?", True), + ("Battery Level?", True), + ("Clock?", True), + ("Sleep Mode?", True), + ] + + logger.summary(f"\n{'#' * 60}") + logger.summary("### PHASE 3: Command Variety ###") + logger.summary(f"### {commands_per_type} commands of each type at 1.0s spacing ###") + logger.summary(f"{'#' * 60}\n") + + all_timings = [] + + for cmd, is_query in test_commands: + if _abort_requested: + break + + # Health check + healthy = await health_check(host, port, timeout, logger, label=f"PHASE3-{cmd}") + if not healthy: + recovery = await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE3-{cmd}") + if not recovery["recovered"]: + logger.log(f"PHASE3 Device not recovered - skipping remaining commands") + break + + logger.log(f"\nPHASE3 === Testing command: {cmd} ({commands_per_type} times) ===") + round_timings = [] + wedge_in_round = False + + for i in range(commands_per_type): + if _abort_requested: + break + + cmd_num = i + 1 + cmd_label = f"PHASE3 [{cmd}] CMD#{cmd_num:03d}" + r = await send_command(host, port, cmd, timeout, is_query=is_query, + logger=logger, label=cmd_label) + + status = "OK" if r["success"] else f"FAIL:{r['error_type']}" + logger.log(f"PHASE3 [{cmd}] CMD#{cmd_num:03d} {status} " + f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " + f"port=:{r['local_port']} data_len={r['data_length']}\n" + f" FULL DATA: {r['data']}") + + logger.record_timing("phase3", cmd_num, cmd, + r["connect_ms"], r["response_ms"], r["total_ms"], + r["success"], r.get("error_type", ""), + r.get("result_code", ""), r.get("data_length", 0), + r.get("local_port", 0)) + + round_timings.append(r) + + if r["error_type"] == "ConnectionRefused": + logger.log(f"PHASE3 *** WEDGE on {cmd}, command #{cmd_num} ***") + wedge_in_round = True + break + + if cmd_num < commands_per_type: + await asyncio.sleep(1.0) + + compute_phase_summary(f"PHASE 3: Command '{cmd}' ({commands_per_type} cmds)", + round_timings, logger) + all_timings.extend(round_timings) + + if wedge_in_round: + recovery = await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE3-{cmd}-recovery") + + # Pause between command types + if cmd != test_commands[-1][0] and not _abort_requested: + logger.log(f"PHASE3 Pausing {pause_between}s between command types...") + for _ in range(pause_between): + if _abort_requested: + break + await asyncio.sleep(1) + + return all_timings + + +async def phase_4_connection_duration(host: str, port: int, timeout: float, + pause_between: int, + logger: StressTestLogger) -> list: + """ + Phase 4: Connection Duration + Does holding connections open longer or shorter affect stability? + """ + logger.summary(f"\n{'#' * 60}") + logger.summary("### PHASE 4: Connection Duration ###") + logger.summary("### Testing various connection hold times ###") + logger.summary(f"{'#' * 60}\n") + + all_timings = [] + + # Round A: Hold connection open 5s after command + rounds = [ + ("A", "Hold 5s after response", 50, 5.0, False), + ("B", "Close immediately", 50, 0.0, False), + ("C", "Hold 30s (no command)", 10, 30.0, True), + ] + + for round_id, desc, count, hold_time, hold_only in rounds: + if _abort_requested: + break + + healthy = await health_check(host, port, timeout, logger, + label=f"PHASE4-Round{round_id}") + if not healthy: + recovery = await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE4-Round{round_id}") + if not recovery["recovered"]: + logger.log(f"PHASE4 Device not recovered - skipping remaining rounds") + break + + logger.log(f"\nPHASE4 === Round {round_id}: {desc} ({count} connections) ===") + round_timings = [] + wedge_in_round = False + + for i in range(count): + if _abort_requested: + break + + cmd_num = i + 1 + + cmd_label = f"PHASE4 [Round{round_id}] #{cmd_num:03d}" + + if hold_only: + # Just open connection, hold, close (no command) + r = {"connect_ms": 0, "response_ms": 0, "total_ms": 0, + "success": False, "error": None, "error_type": None, + "local_port": 0, "data": None, "data_length": 0, + "result_code": ""} + t_start = time.monotonic() + writer = None + try: + t_conn = time.monotonic() + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=timeout + ) + r["connect_ms"] = (time.monotonic() - t_conn) * 1000 + sockname = writer.get_extra_info("sockname") + if sockname: + r["local_port"] = sockname[1] + + logger.log(f"{cmd_label} CONNECT :{r['local_port']} → {host}:{port} " + f"in {r['connect_ms']:.1f}ms, holding {hold_time}s (no command)...", + also_print=False) + + # Hold without sending + await asyncio.sleep(hold_time) + r["success"] = True + + except ConnectionRefusedError: + r["error"] = "Connection refused" + r["error_type"] = "ConnectionRefused" + logger.log(f"{cmd_label} CONNECTION REFUSED", also_print=False) + except asyncio.TimeoutError: + r["error"] = "Timeout" + r["error_type"] = "Timeout" + except ConnectionResetError as e: + r["error"] = f"Connection reset: {e}" + r["error_type"] = "ConnectionReset" + except Exception as e: + r["error"] = str(e) + r["error_type"] = type(e).__name__ + finally: + if writer: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + r["total_ms"] = (time.monotonic() - t_start) * 1000 + else: + r = await send_command(host, port, "DOD?", timeout, + is_query=True, hold_seconds=hold_time, + logger=logger, label=cmd_label) + + status = "OK" if r["success"] else f"FAIL:{r.get('error_type', '?')}" + cmd_desc = f"DOD?(hold={hold_time}s)" if not hold_only else f"connect_only(hold={hold_time}s)" + logger.log(f"PHASE4 [Round{round_id}] #{cmd_num:03d} {status} " + f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms " + f"port=:{r.get('local_port', 0)}") + + logger.record_timing(f"phase4_{round_id}", cmd_num, cmd_desc, + r["connect_ms"], r.get("response_ms", 0), + r["total_ms"], r["success"], + r.get("error_type", ""), + r.get("result_code", ""), r.get("data_length", 0), + r.get("local_port", 0)) + + round_timings.append(r) + + if r.get("error_type") == "ConnectionRefused": + logger.log(f"PHASE4 *** WEDGE in Round {round_id}, connection #{cmd_num} ***") + wedge_in_round = True + break + + # 1s spacing between connections (plus any hold time already elapsed) + remaining_wait = max(0, 1.0 - hold_time) + if remaining_wait > 0 and cmd_num < count: + await asyncio.sleep(remaining_wait) + + compute_phase_summary(f"PHASE 4 Round {round_id}: {desc}", + round_timings, logger) + all_timings.extend(round_timings) + + if wedge_in_round: + await wait_for_recovery(host, port, timeout, logger, + label=f"PHASE4-Round{round_id}-recovery") + + if round_id != rounds[-1][0] and not _abort_requested: + logger.log(f"PHASE4 Pausing {pause_between}s between rounds...") + for _ in range(pause_between): + if _abort_requested: + break + await asyncio.sleep(1) + + # Round D: DRD? stream + if not _abort_requested: + healthy = await health_check(host, port, timeout, logger, label="PHASE4-RoundD") + if healthy: + logger.log(f"\nPHASE4 === Round D: DRD? stream for 60 seconds ===") + + stream_result = await stream_drd(host, port, 60.0, timeout, logger=logger) + + if stream_result["success"]: + logger.log(f"PHASE4 [RoundD] DRD stream OK - " + f"{stream_result['lines_received']} lines in " + f"{stream_result['total_ms']:.0f}ms, " + f"clean_shutdown={stream_result['clean_shutdown']}") + else: + logger.log(f"PHASE4 [RoundD] DRD stream FAIL - " + f"{stream_result['error_type']}: {stream_result['error']}") + + # Health check after stream + logger.log("PHASE4 Checking health after DRD stream...") + await asyncio.sleep(2) # Brief pause + post_stream = await health_check(host, port, timeout, logger, + label="PHASE4-post-DRD") + if not post_stream: + logger.log("PHASE4 *** Device unhealthy after DRD stream ***") + await wait_for_recovery(host, port, timeout, logger, + label="PHASE4-RoundD-recovery") + + logger.summary(f"\nPHASE 4 Round D: DRD Stream (60s)") + logger.summary(f" Lines received: {stream_result['lines_received']}") + logger.summary(f" Clean shutdown: {stream_result['clean_shutdown']}") + logger.summary(f" Duration: {stream_result['total_ms']:.0f}ms") + logger.summary(f" Post-stream health: {'OK' if post_stream else 'FAILED'}") + + return all_timings + + +async def phase_5_soak(host: str, port: int, timeout: float, + duration_minutes: int, logger: StressTestLogger) -> list: + """ + Phase 5: Sustained Soak + Simulate real SLMM polling: DOD? every 60s, with extras every 10th poll. + """ + logger.summary(f"\n{'#' * 60}") + logger.summary("### PHASE 5: Sustained Soak ###") + logger.summary(f"### Simulating SLMM polling for {duration_minutes} minutes ###") + logger.summary(f"### DOD? every 60s, +Battery/Measure every 10th poll ###") + logger.summary(f"{'#' * 60}\n") + + timings = [] + poll_count = 0 + phase_start = time.monotonic() + end_time = phase_start + (duration_minutes * 60) + wedge_detected = False + + while time.monotonic() < end_time and not _abort_requested: + poll_count += 1 + elapsed_min = (time.monotonic() - phase_start) / 60 + + # Primary poll: DOD? + poll_label = f"PHASE5 POLL#{poll_count:04d}" + r = await send_command(host, port, "DOD?", timeout, is_query=True, + logger=logger, label=poll_label) + + status = "OK" if r["success"] else f"FAIL:{r['error_type']}" + logger.log(f"PHASE5 POLL#{poll_count:04d} DOD? {status} " + f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " + f"elapsed={elapsed_min:.1f}min port=:{r['local_port']} " + f"data_len={r['data_length']}\n" + f" FULL DATA: {r['data']}") + + logger.record_timing("phase5", poll_count, "DOD?", + r["connect_ms"], r["response_ms"], r["total_ms"], + r["success"], r.get("error_type", ""), + r.get("result_code", ""), r.get("data_length", 0), + r.get("local_port", 0)) + timings.append(r) + + if r["error_type"] == "ConnectionRefused": + logger.log(f"PHASE5 *** WEDGE DETECTED at poll #{poll_count} " + f"after {elapsed_min:.1f} minutes ***") + wedge_detected = True + break + + # Every 10th poll, also check battery and measurement state + if poll_count % 10 == 0 and r["success"]: + await asyncio.sleep(1.0) + + for extra_cmd in ["Battery Level?", "Measure?"]: + if _abort_requested: + break + + extra_label = f"PHASE5 POLL#{poll_count:04d} {extra_cmd}" + r2 = await send_command(host, port, extra_cmd, timeout, is_query=True, + logger=logger, label=extra_label) + s2 = "OK" if r2["success"] else f"FAIL:{r2['error_type']}" + logger.log(f"PHASE5 POLL#{poll_count:04d} {extra_cmd} {s2} " + f"conn={r2['connect_ms']:.0f}ms resp={r2['response_ms']:.0f}ms " + f"port=:{r2['local_port']} data_len={r2['data_length']}\n" + f" FULL DATA: {r2['data']}") + + logger.record_timing("phase5_extra", poll_count, extra_cmd, + r2["connect_ms"], r2["response_ms"], r2["total_ms"], + r2["success"], r2.get("error_type", ""), + r2.get("result_code", ""), r2.get("data_length", 0), + r2.get("local_port", 0)) + timings.append(r2) + + if r2["error_type"] == "ConnectionRefused": + logger.log(f"PHASE5 *** WEDGE on extra command {extra_cmd} ***") + wedge_detected = True + break + + await asyncio.sleep(1.0) + + if wedge_detected: + break + + # Wait 60 seconds before next poll + for _ in range(60): + if _abort_requested or time.monotonic() >= end_time: + break + await asyncio.sleep(1) + + extra = "" + if wedge_detected: + recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE5") + if recovery["recovered"]: + extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s" + else: + extra = "Device did NOT recover within monitoring window" + elif not _abort_requested: + elapsed_min = (time.monotonic() - phase_start) / 60 + extra = f"Device survived full {elapsed_min:.1f} minute soak without wedging" + + compute_phase_summary("PHASE 5: Sustained Soak", timings, logger, extra) + return timings + + +# ─── Dry Run ───────────────────────────────────────────────────────────────── + +def dry_run(args): + """Show what would be tested without connecting to any device.""" + print(f"\n{'=' * 60}") + print("DRY RUN - No connections will be made") + print(f"{'=' * 60}") + print(f"\nTarget: {args.host}:{args.port}") + print(f"Timeout: {args.timeout}s") + print(f"Phase(s): {args.phase}") + print(f"Log directory: {args.log_dir}") + print(f"Recovery pause: {args.pause}s") + + phases = args.phase if args.phase != "all" else "1,2,3,4,5" + phase_list = [p.strip() for p in phases.split(",")] + + total_commands = 0 + total_time_est = 0 + + for p in phase_list: + print(f"\n--- Phase {p} ---") + + if p == "1": + n = args.max_commands + t = n * 1 # 1s spacing + print(f" Baseline: up to {n} DOD? commands at 1.0s spacing") + print(f" Estimated time: {t}s ({t / 60:.1f} min)") + total_commands += n + total_time_est += t + + elif p == "2": + rates = [1.0, 2.0, 5.0, 10.0, 30.0] + n = args.rate_rounds + for rate in rates: + t = n * rate + print(f" Rate {rate}s: {n} DOD? commands ({t:.0f}s per round)") + pause_time = 4 * args.pause # 4 pauses between 5 rounds + total_commands += n * len(rates) + total_time_est += sum(n * r for r in rates) + pause_time + print(f" + {args.pause}s pause between rounds") + + elif p == "3": + cmds = ["DOD?", "Measure?", "Battery Level?", "Clock?", "Sleep Mode?"] + n = 100 + print(f" {n} commands each of: {', '.join(cmds)}") + print(f" Estimated: {n * len(cmds)}s + {(len(cmds) - 1) * args.pause}s pauses") + total_commands += n * len(cmds) + total_time_est += n * len(cmds) + (len(cmds) - 1) * args.pause + + elif p == "4": + print(" Round A: 50x DOD? with 5s hold = ~250s") + print(" Round B: 50x DOD? close immediately = ~50s") + print(" Round C: 10x connect+30s hold (no command) = ~300s") + print(" Round D: DRD? stream for 60s") + total_commands += 110 + total_time_est += 250 + 50 + 300 + 60 + 3 * args.pause + + elif p == "5": + dur = args.soak_duration + polls = dur # 1 poll per minute + extras = (polls // 10) * 2 # 2 extra commands every 10th poll + print(f" Soak for {dur} minutes") + print(f" ~{polls} DOD? polls + ~{extras} extra commands") + total_commands += polls + extras + total_time_est += dur * 60 + + print(f"\n{'=' * 60}") + print(f"Total estimated commands: ~{total_commands}") + print(f"Total estimated time: ~{total_time_est}s ({total_time_est / 60:.0f} min, " + f"{total_time_est / 3600:.1f} hr)") + print(f"{'=' * 60}\n") + + +# ─── Main ──────────────────────────────────────────────────────────────────── + +async def run(args): + """Main test runner.""" + # Create run directory + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + run_dir = Path(args.log_dir) / f"run_{timestamp}" + logger = StressTestLogger(run_dir) + + logger.summary(f"NL-43 TCP Stress Test") + logger.summary(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.summary(f"Target: {args.host}:{args.port}") + logger.summary(f"Timeout: {args.timeout}s") + logger.summary(f"Phase(s): {args.phase}") + + # Start packet capture + pcap = None + if not args.no_pcap: + pcap_path = run_dir / "capture.pcap" + pcap = PacketCapture(args.host, args.port, pcap_path) + pcap_started = pcap.start() + if pcap_started: + logger.summary(f"Packet capture: {pcap_path}") + else: + logger.summary("Packet capture: DISABLED (tcpdump unavailable or no permissions)") + pcap = None + + # Initial health check + logger.log("\n=== Initial Health Check ===") + healthy = await health_check(args.host, args.port, args.timeout, logger, label="INITIAL") + if not healthy: + logger.log("WARNING: Device is not responding at test start!") + logger.log("Waiting for device to become available...") + recovery = await wait_for_recovery(args.host, args.port, args.timeout, + logger, label="INITIAL") + if not recovery["recovered"]: + logger.summary("ABORTED: Device unreachable at test start and did not recover") + if pcap: + pcap.stop() + logger.close() + return + + phases = args.phase if args.phase != "all" else "1,2,3,4,5" + phase_list = [p.strip() for p in phases.split(",")] + + for phase_num in phase_list: + if _abort_requested: + break + + # Recovery pause between phases + if phase_num != phase_list[0]: + logger.log(f"\n--- Recovery pause ({args.pause}s) before Phase {phase_num} ---") + for _ in range(args.pause): + if _abort_requested: + break + await asyncio.sleep(1) + + healthy = await health_check(args.host, args.port, args.timeout, logger, + label=f"pre-phase{phase_num}") + if not healthy: + logger.log(f"Device unhealthy before Phase {phase_num} - waiting for recovery") + recovery = await wait_for_recovery(args.host, args.port, args.timeout, + logger, label=f"pre-phase{phase_num}") + if not recovery["recovered"]: + logger.log(f"Device not recovered - skipping Phase {phase_num} and remaining") + break + + if phase_num == "1": + await phase_1_baseline(args.host, args.port, args.timeout, + args.max_commands, logger) + elif phase_num == "2": + await phase_2_rate_variation(args.host, args.port, args.timeout, + args.rate_rounds, args.pause, logger) + elif phase_num == "3": + await phase_3_command_variety(args.host, args.port, args.timeout, + 100, args.pause, logger) + elif phase_num == "4": + await phase_4_connection_duration(args.host, args.port, args.timeout, + args.pause, logger) + elif phase_num == "5": + await phase_5_soak(args.host, args.port, args.timeout, + args.soak_duration, logger) + + # Stop packet capture + if pcap: + logger.summary("\nStopping packet capture...") + pcap_stats = pcap.stop() + logger.summary(f"Packets captured: {pcap_stats['packets_captured']}") + logger.summary(f"PCAP file: {pcap_stats['pcap_file']}") + logger.summary(f"PCAP size: {pcap_stats['file_size_bytes']} bytes") + + # Final summary + logger.summary(f"\nFinished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.summary(f"Logs saved to: {run_dir}") + logger.close() + + print(f"\nResults saved to: {run_dir}") + + +def main(): + parser = argparse.ArgumentParser( + description="NL-43 TCP Wedge Stress Test Tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Test Phases: + 1 Baseline Connection Count - DOD? at 1s until wedge or max + 2 Rate Variation - DOD? at 1s, 2s, 5s, 10s, 30s spacing + 3 Command Variety - Different commands to see if type matters + 4 Connection Duration - Hold connections open various durations + 5 Sustained Soak - Simulate real SLMM polling for hours + +Examples: + %(prog)s --host 192.168.1.100 --phase 1 --max-commands 500 + %(prog)s --host 192.168.1.100 --phase 5 --soak-duration 240 + %(prog)s --host 192.168.1.100 --phase all --dry-run + %(prog)s --host 10.0.0.50 --phase 1,2 --timeout 10 + """ + ) + + parser.add_argument("--host", required=True, help="NL-43 IP address") + parser.add_argument("--port", type=int, default=DEFAULT_PORT, + help=f"TCP port (default: {DEFAULT_PORT})") + parser.add_argument("--phase", default="all", + help="Phase(s) to run: 1,2,3,4,5 or 'all' (default: all)") + parser.add_argument("--max-commands", type=int, default=2000, + help="Phase 1: max commands before stopping (default: 2000)") + parser.add_argument("--rate-rounds", type=int, default=50, + help="Phase 2: commands per rate round (default: 50)") + parser.add_argument("--soak-duration", type=int, default=120, + help="Phase 5: soak duration in minutes (default: 120)") + parser.add_argument("--log-dir", default="./stress_test_logs", + help="Log output directory (default: ./stress_test_logs)") + parser.add_argument("--pause", type=int, default=60, + help="Recovery pause between phases in seconds (default: 60)") + parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, + help=f"TCP timeout in seconds (default: {DEFAULT_TIMEOUT})") + parser.add_argument("--dry-run", action="store_true", + help="Show test plan without connecting") + parser.add_argument("--no-pcap", action="store_true", + help="Disable built-in tcpdump packet capture") + + args = parser.parse_args() + + if args.dry_run: + dry_run(args) + return + + print(f"\nNL-43 TCP Stress Test Tool") + print(f"Target: {args.host}:{args.port}") + print(f"Phase(s): {args.phase}") + print(f"Press Ctrl+C at any time to stop safely\n") + + asyncio.run(run(args)) + + +if __name__ == "__main__": + main()