Files
slmm/SLM-stress-test/nl43_stress_test.py
2026-02-10 07:07:34 +00:00

1484 lines
60 KiB
Python

#!/usr/bin/env python3
"""
NL-43 TCP Wedge Stress Test Tool
Standalone diagnostic tool for determining what causes the NL-43 sound level
meter's TCP port to become "wedged" (connection refused). Communicates directly
with the device over raw TCP sockets - zero SLMM dependencies.
Usage:
python nl43_stress_test.py --host 192.168.1.100 --port 2255 --phase 1
python nl43_stress_test.py --host 192.168.1.100 --phase all --dry-run
Test Phases:
1 - Baseline Connection Count: How many commands before it dies?
2 - Rate Variation: Does spacing between commands matter?
3 - Command Variety: Do certain commands wedge faster?
4 - Connection Duration: Does holding connections open matter?
5 - Sustained Soak: Simulate real SLMM polling over hours
"""
import asyncio
import argparse
import csv
import os
import shutil
import signal
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from statistics import mean, median, stdev
# ─── Constants ───────────────────────────────────────────────────────────────
DEFAULT_PORT = 2255
DEFAULT_TIMEOUT = 5.0
CRLF = b"\r\n"
SUB_BYTE = b"\x1a"
# NL-43 result codes
RESULT_OK = "R+0000"
RESULT_CMD_ERROR = "R+0001"
RESULT_PARAM_ERROR = "R+0002"
RESULT_SPEC_ERROR = "R+0003"
RESULT_STATUS_ERROR = "R+0004"
# ─── Global state ────────────────────────────────────────────────────────────
_abort_requested = False
def _signal_handler(sig, frame):
global _abort_requested
_abort_requested = True
print("\n\n!!! Ctrl+C detected - finishing current command and writing summary...\n")
signal.signal(signal.SIGINT, _signal_handler)
# ─── Packet Capture ──────────────────────────────────────────────────────────
class PacketCapture:
"""
Manages a tcpdump subprocess that captures all traffic to/from the NL-43.
Saves a .pcap file in the run directory for Wireshark analysis.
"""
def __init__(self, host: str, port: int, pcap_path: Path):
self.host = host
self.port = port
self.pcap_path = pcap_path
self.process: subprocess.Popen | None = None
self.available = False
def _try_start_tcpdump(self, cmd: list[str], label: str) -> bool:
"""Attempt to start tcpdump with the given command. Returns True if successful."""
try:
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Give it a moment to start or fail
time.sleep(1.0)
if self.process.poll() is not None:
# Process exited immediately
stderr = self.process.stderr.read().decode(errors="ignore")
print(f"[PCAP] {label} tcpdump exited immediately: {stderr.strip()}")
self.process = None
return False
# Process is running - check stderr for warnings (non-blocking read)
# tcpdump writes "listening on ..." to stderr on success
import select
ready, _, _ = select.select([self.process.stderr], [], [], 0.5)
if ready:
# Read available stderr without blocking
import os
stderr_bytes = os.read(self.process.stderr.fileno(), 4096)
stderr_text = stderr_bytes.decode(errors="ignore")
if "listening on" in stderr_text.lower():
print(f"[PCAP] {label} tcpdump started: {stderr_text.strip()}")
self.available = True
return True
elif "permission" in stderr_text.lower() or "operation not permitted" in stderr_text.lower():
print(f"[PCAP] {label} tcpdump permission denied: {stderr_text.strip()}")
self.process.terminate()
self.process.wait(timeout=3)
self.process = None
return False
else:
# Some other output - might be OK, might not
print(f"[PCAP] {label} tcpdump stderr: {stderr_text.strip()}")
# Process is still running - assume OK
self.available = True
return True
except FileNotFoundError:
print(f"[PCAP] {label} tcpdump binary not found")
self.process = None
return False
except Exception as e:
print(f"[PCAP] {label} failed to start: {e}")
if self.process:
try:
self.process.kill()
self.process.wait(timeout=3)
except Exception:
pass
self.process = None
return False
def start(self) -> bool:
"""Start tcpdump capture. Returns True if started successfully."""
if not shutil.which("tcpdump"):
print("[PCAP] tcpdump not found in PATH - packet capture disabled")
print("[PCAP] Install with: sudo apt install tcpdump")
return False
# Capture all TCP traffic to/from the device on the specified port
# -i any: all interfaces
# -nn: no name resolution
# -s 0: full packets, no truncation
# -w: write raw pcap to file
# -U: packet-buffered output (flush after each packet)
cmd = [
"tcpdump",
"-i", "any",
"-nn",
"-s", "0",
"-U",
"-w", str(self.pcap_path),
f"host {self.host} and tcp port {self.port}",
]
# Try with sudo first (tcpdump almost always needs root for raw capture)
print("[PCAP] Starting packet capture (requires sudo for raw interface access)...")
if self._try_start_tcpdump(["sudo", "-n"] + cmd, "sudo"):
print(f"[PCAP] ✓ Capturing to: {self.pcap_path}")
print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}")
return True
# Fallback: try without sudo (works if user has CAP_NET_RAW capability)
print("[PCAP] sudo failed, trying without sudo...")
if self._try_start_tcpdump(cmd, "non-sudo"):
print(f"[PCAP] ✓ Capturing to: {self.pcap_path}")
print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}")
return True
print("[PCAP] ✗ Could not start packet capture")
print("[PCAP] To enable, run the stress test with sudo:")
print(f"[PCAP] sudo python3 nl43_stress_test.py --host {self.host} --port {self.port} ...")
print("[PCAP] Or run tcpdump separately in another terminal:")
print(f"[PCAP] sudo tcpdump -i any -nn -s 0 -U -w capture.pcap host {self.host} and tcp port {self.port}")
return False
def stop(self) -> dict:
"""Stop tcpdump and return capture stats."""
stats = {
"pcap_file": str(self.pcap_path),
"packets_captured": 0,
"file_size_bytes": 0,
}
if not self.process:
return stats
try:
# SIGTERM for graceful shutdown (tcpdump prints stats on exit)
self.process.terminate()
try:
_, stderr = self.process.communicate(timeout=5)
stderr_text = stderr.decode(errors="ignore")
for line in stderr_text.split("\n"):
line = line.strip()
if not line:
continue
if "packets captured" in line:
try:
stats["packets_captured"] = int(line.split()[0])
except (ValueError, IndexError):
pass
# Print all tcpdump stats lines
if any(k in line for k in ["captured", "received", "dropped"]):
print(f"[PCAP] {line}")
except subprocess.TimeoutExpired:
self.process.kill()
self.process.wait()
except Exception as e:
print(f"[PCAP] Error stopping tcpdump: {e}")
# Report file size
if self.pcap_path.exists():
stats["file_size_bytes"] = self.pcap_path.stat().st_size
size_mb = stats["file_size_bytes"] / (1024 * 1024)
print(f"[PCAP] Saved: {self.pcap_path} ({size_mb:.2f} MB)")
print(f"[PCAP] Open with: wireshark {self.pcap_path}")
print(f"[PCAP] or: tcpdump -nn -r {self.pcap_path}")
else:
print(f"[PCAP] Warning: pcap file not found at {self.pcap_path}")
self.process = None
return stats
# ─── Logger ──────────────────────────────────────────────────────────────────
class StressTestLogger:
"""Dual-output logger: console + file, with CSV data writers."""
def __init__(self, run_dir: Path):
self.run_dir = run_dir
self.run_dir.mkdir(parents=True, exist_ok=True)
self.log_file = open(run_dir / "full_log.txt", "w", buffering=1)
self.summary_file = open(run_dir / "summary.txt", "w", buffering=1)
# Raw wire log - every byte sent/received in hex + ascii
self.wire_log = open(run_dir / "wire_log.txt", "w", buffering=1)
# CSV for timing data
self.timing_csv_path = run_dir / "timing_data.csv"
self.timing_csv = open(self.timing_csv_path, "w", newline="")
self.timing_writer = csv.writer(self.timing_csv)
self.timing_writer.writerow([
"phase", "cmd_num", "command", "connect_ms", "response_ms",
"total_ms", "success", "error_type", "result_code", "data_length",
"local_port", "timestamp"
])
def log(self, message: str, also_print: bool = True):
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
line = f"[{ts}] {message}"
self.log_file.write(line + "\n")
if also_print:
print(line)
def wire(self, direction: str, raw_bytes: bytes, label: str = ""):
"""Log raw wire data - every byte in hex and ascii representation."""
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
hex_str = raw_bytes.hex(" ")
ascii_str = raw_bytes.decode("ascii", errors="replace")
# Show control characters explicitly
readable = ""
for b in raw_bytes:
if b == 0x0d:
readable += "\\r"
elif b == 0x0a:
readable += "\\n"
elif b == 0x1a:
readable += "\\x1a[SUB]"
elif b == 0x24:
readable += "$"
elif 32 <= b < 127:
readable += chr(b)
else:
readable += f"\\x{b:02x}"
self.wire_log.write(f"[{ts}] {label} {direction} ({len(raw_bytes)} bytes)\n")
self.wire_log.write(f" HEX: {hex_str}\n")
self.wire_log.write(f" ASCII: {readable}\n")
self.wire_log.flush()
def summary(self, message: str):
self.summary_file.write(message + "\n")
self.log_file.write(message + "\n")
print(message)
def record_timing(self, phase: str, cmd_num: int, command: str,
connect_ms: float, response_ms: float, total_ms: float,
success: bool, error_type: str = "", result_code: str = "",
data_length: int = 0, local_port: int = 0):
self.timing_writer.writerow([
phase, cmd_num, command, f"{connect_ms:.1f}", f"{response_ms:.1f}",
f"{total_ms:.1f}", success, error_type, result_code, data_length,
local_port,
datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
])
self.timing_csv.flush()
def close(self):
self.log_file.close()
self.summary_file.close()
self.timing_csv.close()
self.wire_log.close()
# ─── Raw TCP Communication ──────────────────────────────────────────────────
async def send_command(host: str, port: int, command: str, timeout: float,
is_query: bool = True, hold_seconds: float = 0,
logger: "StressTestLogger | None" = None,
label: str = "") -> dict:
"""
Send a single command to the NL-43 over a fresh TCP connection.
Returns dict with timing and result info:
connect_ms, response_ms, total_ms, success, result_code, data,
data_length, raw_result_bytes, raw_data_bytes, local_port,
error, error_type
"""
result = {
"connect_ms": 0.0,
"response_ms": 0.0,
"total_ms": 0.0,
"success": False,
"result_code": None,
"data": None,
"data_length": 0,
"raw_result_bytes": None,
"raw_data_bytes": None,
"local_port": 0,
"error": None,
"error_type": None,
}
t_start = time.monotonic()
reader = None
writer = None
try:
# Connect
t_conn_start = time.monotonic()
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000
# Grab local socket info (source IP:port)
sockname = writer.get_extra_info("sockname")
if sockname:
result["local_port"] = sockname[1]
if logger:
logger.log(f"{label} CONNECT {host}:{port} from :{result['local_port']} "
f"in {result['connect_ms']:.1f}ms", also_print=False)
# Send command
cmd_bytes = command.encode("ascii")
if not cmd_bytes.endswith(CRLF):
cmd_bytes += CRLF
writer.write(cmd_bytes)
await writer.drain()
if logger:
logger.wire("SEND>>>", cmd_bytes, label)
# Read result code (first line)
t_resp_start = time.monotonic()
first_line_raw = await asyncio.wait_for(
reader.readuntil(b"\n"),
timeout=timeout
)
result["raw_result_bytes"] = first_line_raw
if logger:
logger.wire("<<<RECV", first_line_raw, f"{label} result_code")
result_code = first_line_raw.decode("ascii", errors="ignore").strip()
# Strip leading $ if present
if result_code.startswith("$"):
result_code = result_code[1:].strip()
result["result_code"] = result_code
# Read data line for queries
if result_code == RESULT_OK and is_query:
data_line_raw = await asyncio.wait_for(
reader.readuntil(b"\n"),
timeout=timeout
)
result["raw_data_bytes"] = data_line_raw
result["data"] = data_line_raw.decode("ascii", errors="ignore").strip()
result["data_length"] = len(result["data"])
if logger:
logger.wire("<<<RECV", data_line_raw, f"{label} data ({result['data_length']} chars)")
result["response_ms"] = (time.monotonic() - t_resp_start) * 1000
# Optionally hold connection open
if hold_seconds > 0:
if logger:
logger.log(f"{label} HOLDING connection open for {hold_seconds}s...",
also_print=False)
await asyncio.sleep(hold_seconds)
result["success"] = result_code == RESULT_OK
if result_code != RESULT_OK:
result["error"] = f"Device returned {result_code}"
result["error_type"] = "DeviceError"
except ConnectionRefusedError:
result["error"] = "Connection refused"
result["error_type"] = "ConnectionRefused"
if logger:
logger.log(f"{label} CONNECTION REFUSED by {host}:{port}", also_print=False)
except asyncio.TimeoutError:
result["error"] = f"Timeout after {timeout}s"
result["error_type"] = "Timeout"
if logger:
logger.log(f"{label} TIMEOUT after {timeout}s connecting to {host}:{port}",
also_print=False)
except ConnectionResetError as e:
result["error"] = f"Connection reset: {e}"
result["error_type"] = "ConnectionReset"
if logger:
logger.log(f"{label} CONNECTION RESET by {host}:{port}: {e}", also_print=False)
except OSError as e:
result["error"] = f"{type(e).__name__}: {e}"
result["error_type"] = type(e).__name__
if logger:
logger.log(f"{label} OS ERROR: {type(e).__name__}: {e}", also_print=False)
except Exception as e:
result["error"] = f"{type(e).__name__}: {e}"
result["error_type"] = type(e).__name__
if logger:
logger.log(f"{label} UNEXPECTED ERROR: {type(e).__name__}: {e}", also_print=False)
finally:
if writer:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
result["total_ms"] = (time.monotonic() - t_start) * 1000
if logger:
logger.log(f"{label} CLOSE total={result['total_ms']:.1f}ms "
f"local_port={result['local_port']}", also_print=False)
return result
async def stream_drd(host: str, port: int, duration_seconds: float,
timeout: float,
logger: "StressTestLogger | None" = None) -> dict:
"""
Open a DRD? streaming connection and hold it for the specified duration.
Returns dict with timing and streaming stats.
"""
result = {
"connect_ms": 0.0,
"total_ms": 0.0,
"success": False,
"lines_received": 0,
"error": None,
"error_type": None,
"clean_shutdown": False,
}
t_start = time.monotonic()
writer = None
try:
t_conn_start = time.monotonic()
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000
# Grab local port
sockname = writer.get_extra_info("sockname")
local_port = sockname[1] if sockname else 0
result["local_port"] = local_port
if logger:
logger.log(f"DRD CONNECT :{local_port}{host}:{port} "
f"in {result['connect_ms']:.1f}ms", also_print=False)
# Send DRD?
cmd_bytes = b"DRD?\r\n"
writer.write(cmd_bytes)
await writer.drain()
if logger:
logger.wire("SEND>>>", cmd_bytes, "DRD")
# Read result code
first_line = await asyncio.wait_for(
reader.readuntil(b"\n"),
timeout=timeout
)
if logger:
logger.wire("<<<RECV", first_line, "DRD result_code")
code = first_line.decode("ascii", errors="ignore").strip()
if code.startswith("$"):
code = code[1:].strip()
if code != RESULT_OK:
result["error"] = f"DRD? returned {code}"
result["error_type"] = "DeviceError"
return result
# Stream for the specified duration
stream_end = time.monotonic() + duration_seconds
while time.monotonic() < stream_end and not _abort_requested:
try:
line = await asyncio.wait_for(
reader.readuntil(b"\n"),
timeout=30.0
)
result["lines_received"] += 1
# Log every 10th stream line to wire log (full verbosity but manageable)
if logger and (result["lines_received"] <= 5 or result["lines_received"] % 10 == 0):
logger.wire("<<<RECV", line,
f"DRD stream line#{result['lines_received']}")
except asyncio.TimeoutError:
result["error"] = "Stream timeout (no data for 30s)"
result["error_type"] = "StreamTimeout"
break
# Clean shutdown with SUB byte
try:
writer.write(SUB_BYTE)
await writer.drain()
result["clean_shutdown"] = True
if logger:
logger.wire("SEND>>>", SUB_BYTE, "DRD shutdown")
except Exception:
pass
result["success"] = True
except ConnectionRefusedError:
result["error"] = "Connection refused"
result["error_type"] = "ConnectionRefused"
except asyncio.TimeoutError:
result["error"] = f"Connect timeout after {timeout}s"
result["error_type"] = "Timeout"
except Exception as e:
result["error"] = str(e)
result["error_type"] = type(e).__name__
finally:
if writer:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
result["total_ms"] = (time.monotonic() - t_start) * 1000
return result
# ─── Health Check ────────────────────────────────────────────────────────────
async def health_check(host: str, port: int, timeout: float,
logger: StressTestLogger, label: str = "") -> bool:
"""
Quick connectivity test. Returns True if device responds to DOD?.
"""
prefix = f"HEALTH [{label}]" if label else "HEALTH"
logger.log(f"{prefix} Checking device at {host}:{port}...")
r = await send_command(host, port, "DOD?", timeout, is_query=True,
logger=logger, label=prefix)
if r["success"]:
logger.log(f"{prefix} OK - connect={r['connect_ms']:.0f}ms "
f"response={r['response_ms']:.0f}ms "
f"local_port={r['local_port']} "
f"data_len={r['data_length']}")
return True
else:
logger.log(f"{prefix} FAIL - {r['error_type']}: {r['error']}")
return False
async def wait_for_recovery(host: str, port: int, timeout: float,
logger: StressTestLogger, max_wait: int = 300,
label: str = "") -> dict:
"""
Wait for the device to recover after a wedge.
Tries every 10 seconds for up to max_wait seconds.
Returns dict with recovery info.
"""
prefix = f"RECOVERY [{label}]" if label else "RECOVERY"
logger.log(f"{prefix} Device unresponsive. Monitoring for recovery (max {max_wait}s)...")
t_start = time.monotonic()
attempts = 0
while (time.monotonic() - t_start) < max_wait and not _abort_requested:
attempts += 1
elapsed = time.monotonic() - t_start
logger.log(f"{prefix} Attempt #{attempts} at {elapsed:.0f}s...")
r = await send_command(host, port, "DOD?", timeout, is_query=True,
logger=logger, label=f"{prefix} attempt#{attempts}")
if r["success"]:
recovery_time = time.monotonic() - t_start
logger.log(f"{prefix} *** DEVICE RECOVERED after {recovery_time:.1f}s ({attempts} attempts) ***")
return {
"recovered": True,
"recovery_seconds": recovery_time,
"attempts": attempts,
}
logger.log(f"{prefix} Still down: {r['error_type']}: {r['error']}")
await asyncio.sleep(10)
total_time = time.monotonic() - t_start
logger.log(f"{prefix} Device did NOT recover within {total_time:.0f}s ({attempts} attempts)")
return {
"recovered": False,
"recovery_seconds": total_time,
"attempts": attempts,
}
# ─── Phase Summaries ────────────────────────────────────────────────────────
def compute_phase_summary(phase_name: str, timings: list, logger: StressTestLogger,
extra_info: str = ""):
"""Compute and log summary statistics for a phase."""
total = len(timings)
successes = [t for t in timings if t["success"]]
failures = [t for t in timings if not t["success"]]
logger.summary(f"\n{'=' * 60}")
logger.summary(f"=== {phase_name} SUMMARY ===")
logger.summary(f"{'=' * 60}")
logger.summary(f"Total commands sent: {total}")
logger.summary(f"Total successful: {len(successes)}")
logger.summary(f"Total failed: {len(failures)}")
if failures:
error_types = {}
for f in failures:
et = f.get("error_type", "Unknown")
error_types[et] = error_types.get(et, 0) + 1
for et, count in error_types.items():
logger.summary(f" {et}: {count}")
if successes:
connect_times = [t["connect_ms"] for t in successes]
response_times = [t["response_ms"] for t in successes]
total_times = [t["total_ms"] for t in successes]
logger.summary(f"\nConnect time (ms): avg={mean(connect_times):.1f} "
f"med={median(connect_times):.1f} "
f"min={min(connect_times):.1f} max={max(connect_times):.1f}")
logger.summary(f"Response time (ms): avg={mean(response_times):.1f} "
f"med={median(response_times):.1f} "
f"min={min(response_times):.1f} max={max(response_times):.1f}")
logger.summary(f"Total time (ms): avg={mean(total_times):.1f} "
f"med={median(total_times):.1f} "
f"min={min(total_times):.1f} max={max(total_times):.1f}")
if len(connect_times) > 1:
logger.summary(f"Connect stdev (ms): {stdev(connect_times):.1f}")
logger.summary(f"Response stdev (ms): {stdev(response_times):.1f}")
# Trend analysis - compare first 20% vs last 20%
if len(response_times) >= 10:
bucket = max(1, len(response_times) // 5)
first_bucket = response_times[:bucket]
last_bucket = response_times[-bucket:]
first_avg = mean(first_bucket)
last_avg = mean(last_bucket)
change_pct = ((last_avg - first_avg) / first_avg) * 100 if first_avg > 0 else 0
if change_pct > 20:
trend = "INCREASING"
elif change_pct < -20:
trend = "DECREASING"
else:
trend = "STABLE"
logger.summary(f"\nResponse time trend: {trend} "
f"(first {bucket}: {first_avg:.1f}ms → last {bucket}: {last_avg:.1f}ms, "
f"{change_pct:+.1f}%)")
if failures:
# Time/command number of first failure
first_fail = failures[0]
fail_idx = timings.index(first_fail)
elapsed_at_fail = sum(t["total_ms"] for t in timings[:fail_idx + 1]) / 1000
logger.summary(f"\nFirst failure at: command #{fail_idx + 1} "
f"after {elapsed_at_fail:.1f}s ({elapsed_at_fail / 60:.1f} min)")
logger.summary(f"Failure type: {first_fail['error_type']}: {first_fail['error']}")
if extra_info:
logger.summary(f"\n{extra_info}")
logger.summary(f"{'=' * 60}\n")
# ─── Test Phases ─────────────────────────────────────────────────────────────
async def phase_1_baseline(host: str, port: int, timeout: float,
max_commands: int, logger: StressTestLogger) -> list:
"""
Phase 1: Baseline Connection Count
How many DOD? commands at 1-second spacing before the device wedges?
"""
logger.summary(f"\n{'#' * 60}")
logger.summary("### PHASE 1: Baseline Connection Count ###")
logger.summary(f"### Sending up to {max_commands} DOD? commands at 1.0s spacing ###")
logger.summary(f"{'#' * 60}\n")
timings = []
wedge_detected = False
phase_start = time.monotonic()
for i in range(max_commands):
if _abort_requested:
logger.log("PHASE1 Abort requested by user")
break
cmd_num = i + 1
elapsed_total = time.monotonic() - phase_start
cmd_label = f"PHASE1 CMD#{cmd_num:04d}"
r = await send_command(host, port, "DOD?", timeout, is_query=True,
logger=logger, label=cmd_label)
# Log every command with FULL data
if r["success"]:
logger.log(f"PHASE1 CMD#{cmd_num:04d} OK "
f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms "
f"total={r['total_ms']:.0f}ms elapsed={elapsed_total:.1f}s "
f"port=:{r['local_port']} code={r['result_code']} "
f"data_len={r['data_length']}\n"
f" FULL DATA: {r['data']}")
else:
logger.log(f"PHASE1 CMD#{cmd_num:04d} FAIL "
f"{r['error_type']}: {r['error']} "
f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms "
f"elapsed={elapsed_total:.1f}s port=:{r['local_port']}")
logger.record_timing("phase1", cmd_num, "DOD?",
r["connect_ms"], r["response_ms"], r["total_ms"],
r["success"], r.get("error_type", ""),
r.get("result_code", ""), r.get("data_length", 0),
r.get("local_port", 0))
timings.append(r)
# Check for wedge
if r["error_type"] == "ConnectionRefused":
logger.log(f"PHASE1 *** WEDGE DETECTED at command #{cmd_num} "
f"after {elapsed_total:.1f}s ({elapsed_total / 60:.1f} min) ***")
wedge_detected = True
break
# Wait 1 second before next command
if cmd_num < max_commands:
await asyncio.sleep(1.0)
extra = ""
if not wedge_detected and not _abort_requested:
extra = f"Device survived all {max_commands} commands without wedging."
elif wedge_detected:
recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE1")
if recovery["recovered"]:
extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s"
else:
extra = "Device did NOT recover within monitoring window"
compute_phase_summary("PHASE 1: Baseline Connection Count", timings, logger, extra)
return timings
async def phase_2_rate_variation(host: str, port: int, timeout: float,
commands_per_round: int, pause_between: int,
logger: StressTestLogger) -> list:
"""
Phase 2: Rate Variation
Does the spacing between commands affect when/if the device wedges?
"""
rates = [1.0, 2.0, 5.0, 10.0, 30.0]
logger.summary(f"\n{'#' * 60}")
logger.summary("### PHASE 2: Rate Variation ###")
logger.summary(f"### {commands_per_round} commands per round at rates: {rates}s ###")
logger.summary(f"{'#' * 60}\n")
all_timings = []
for rate in rates:
if _abort_requested:
break
# Health check before each round
logger.log(f"PHASE2 Checking device health before {rate}s rate round...")
healthy = await health_check(host, port, timeout, logger, label=f"PHASE2-{rate}s")
if not healthy:
logger.log(f"PHASE2 Device unhealthy before {rate}s round - attempting recovery")
recovery = await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE2-{rate}s")
if not recovery["recovered"]:
logger.log(f"PHASE2 Device not recovered - skipping remaining rates")
break
logger.log(f"\nPHASE2 === Starting {rate}s rate round ({commands_per_round} commands) ===")
round_timings = []
wedge_in_round = False
for i in range(commands_per_round):
if _abort_requested:
break
cmd_num = i + 1
cmd_label = f"PHASE2 [{rate}s] CMD#{cmd_num:03d}"
r = await send_command(host, port, "DOD?", timeout, is_query=True,
logger=logger, label=cmd_label)
status = "OK" if r["success"] else f"FAIL:{r['error_type']}"
logger.log(f"PHASE2 [{rate}s] CMD#{cmd_num:03d} {status} "
f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms "
f"total={r['total_ms']:.0f}ms port=:{r['local_port']} "
f"data_len={r['data_length']}\n"
f" FULL DATA: {r['data']}")
logger.record_timing(f"phase2_{rate}s", cmd_num, "DOD?",
r["connect_ms"], r["response_ms"], r["total_ms"],
r["success"], r.get("error_type", ""),
r.get("result_code", ""), r.get("data_length", 0),
r.get("local_port", 0))
round_timings.append(r)
if r["error_type"] == "ConnectionRefused":
logger.log(f"PHASE2 *** WEDGE at {rate}s rate, command #{cmd_num} ***")
wedge_in_round = True
break
if cmd_num < commands_per_round:
await asyncio.sleep(rate)
compute_phase_summary(f"PHASE 2: Rate {rate}s ({commands_per_round} commands)",
round_timings, logger)
all_timings.extend(round_timings)
if wedge_in_round:
recovery = await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE2-{rate}s-recovery")
# Pause between rounds
if rate != rates[-1] and not _abort_requested:
logger.log(f"PHASE2 Pausing {pause_between}s between rounds...")
for _ in range(pause_between):
if _abort_requested:
break
await asyncio.sleep(1)
return all_timings
async def phase_3_command_variety(host: str, port: int, timeout: float,
commands_per_type: int, pause_between: int,
logger: StressTestLogger) -> list:
"""
Phase 3: Command Variety
Do certain commands cause the device to wedge faster?
"""
test_commands = [
("DOD?", True),
("Measure?", True),
("Battery Level?", True),
("Clock?", True),
("Sleep Mode?", True),
]
logger.summary(f"\n{'#' * 60}")
logger.summary("### PHASE 3: Command Variety ###")
logger.summary(f"### {commands_per_type} commands of each type at 1.0s spacing ###")
logger.summary(f"{'#' * 60}\n")
all_timings = []
for cmd, is_query in test_commands:
if _abort_requested:
break
# Health check
healthy = await health_check(host, port, timeout, logger, label=f"PHASE3-{cmd}")
if not healthy:
recovery = await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE3-{cmd}")
if not recovery["recovered"]:
logger.log(f"PHASE3 Device not recovered - skipping remaining commands")
break
logger.log(f"\nPHASE3 === Testing command: {cmd} ({commands_per_type} times) ===")
round_timings = []
wedge_in_round = False
for i in range(commands_per_type):
if _abort_requested:
break
cmd_num = i + 1
cmd_label = f"PHASE3 [{cmd}] CMD#{cmd_num:03d}"
r = await send_command(host, port, cmd, timeout, is_query=is_query,
logger=logger, label=cmd_label)
status = "OK" if r["success"] else f"FAIL:{r['error_type']}"
logger.log(f"PHASE3 [{cmd}] CMD#{cmd_num:03d} {status} "
f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms "
f"port=:{r['local_port']} data_len={r['data_length']}\n"
f" FULL DATA: {r['data']}")
logger.record_timing("phase3", cmd_num, cmd,
r["connect_ms"], r["response_ms"], r["total_ms"],
r["success"], r.get("error_type", ""),
r.get("result_code", ""), r.get("data_length", 0),
r.get("local_port", 0))
round_timings.append(r)
if r["error_type"] == "ConnectionRefused":
logger.log(f"PHASE3 *** WEDGE on {cmd}, command #{cmd_num} ***")
wedge_in_round = True
break
if cmd_num < commands_per_type:
await asyncio.sleep(1.0)
compute_phase_summary(f"PHASE 3: Command '{cmd}' ({commands_per_type} cmds)",
round_timings, logger)
all_timings.extend(round_timings)
if wedge_in_round:
recovery = await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE3-{cmd}-recovery")
# Pause between command types
if cmd != test_commands[-1][0] and not _abort_requested:
logger.log(f"PHASE3 Pausing {pause_between}s between command types...")
for _ in range(pause_between):
if _abort_requested:
break
await asyncio.sleep(1)
return all_timings
async def phase_4_connection_duration(host: str, port: int, timeout: float,
pause_between: int,
logger: StressTestLogger) -> list:
"""
Phase 4: Connection Duration
Does holding connections open longer or shorter affect stability?
"""
logger.summary(f"\n{'#' * 60}")
logger.summary("### PHASE 4: Connection Duration ###")
logger.summary("### Testing various connection hold times ###")
logger.summary(f"{'#' * 60}\n")
all_timings = []
# Round A: Hold connection open 5s after command
rounds = [
("A", "Hold 5s after response", 50, 5.0, False),
("B", "Close immediately", 50, 0.0, False),
("C", "Hold 30s (no command)", 10, 30.0, True),
]
for round_id, desc, count, hold_time, hold_only in rounds:
if _abort_requested:
break
healthy = await health_check(host, port, timeout, logger,
label=f"PHASE4-Round{round_id}")
if not healthy:
recovery = await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE4-Round{round_id}")
if not recovery["recovered"]:
logger.log(f"PHASE4 Device not recovered - skipping remaining rounds")
break
logger.log(f"\nPHASE4 === Round {round_id}: {desc} ({count} connections) ===")
round_timings = []
wedge_in_round = False
for i in range(count):
if _abort_requested:
break
cmd_num = i + 1
cmd_label = f"PHASE4 [Round{round_id}] #{cmd_num:03d}"
if hold_only:
# Just open connection, hold, close (no command)
r = {"connect_ms": 0, "response_ms": 0, "total_ms": 0,
"success": False, "error": None, "error_type": None,
"local_port": 0, "data": None, "data_length": 0,
"result_code": ""}
t_start = time.monotonic()
writer = None
try:
t_conn = time.monotonic()
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=timeout
)
r["connect_ms"] = (time.monotonic() - t_conn) * 1000
sockname = writer.get_extra_info("sockname")
if sockname:
r["local_port"] = sockname[1]
logger.log(f"{cmd_label} CONNECT :{r['local_port']}{host}:{port} "
f"in {r['connect_ms']:.1f}ms, holding {hold_time}s (no command)...",
also_print=False)
# Hold without sending
await asyncio.sleep(hold_time)
r["success"] = True
except ConnectionRefusedError:
r["error"] = "Connection refused"
r["error_type"] = "ConnectionRefused"
logger.log(f"{cmd_label} CONNECTION REFUSED", also_print=False)
except asyncio.TimeoutError:
r["error"] = "Timeout"
r["error_type"] = "Timeout"
except ConnectionResetError as e:
r["error"] = f"Connection reset: {e}"
r["error_type"] = "ConnectionReset"
except Exception as e:
r["error"] = str(e)
r["error_type"] = type(e).__name__
finally:
if writer:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
r["total_ms"] = (time.monotonic() - t_start) * 1000
else:
r = await send_command(host, port, "DOD?", timeout,
is_query=True, hold_seconds=hold_time,
logger=logger, label=cmd_label)
status = "OK" if r["success"] else f"FAIL:{r.get('error_type', '?')}"
cmd_desc = f"DOD?(hold={hold_time}s)" if not hold_only else f"connect_only(hold={hold_time}s)"
logger.log(f"PHASE4 [Round{round_id}] #{cmd_num:03d} {status} "
f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms "
f"port=:{r.get('local_port', 0)}")
logger.record_timing(f"phase4_{round_id}", cmd_num, cmd_desc,
r["connect_ms"], r.get("response_ms", 0),
r["total_ms"], r["success"],
r.get("error_type", ""),
r.get("result_code", ""), r.get("data_length", 0),
r.get("local_port", 0))
round_timings.append(r)
if r.get("error_type") == "ConnectionRefused":
logger.log(f"PHASE4 *** WEDGE in Round {round_id}, connection #{cmd_num} ***")
wedge_in_round = True
break
# 1s spacing between connections (plus any hold time already elapsed)
remaining_wait = max(0, 1.0 - hold_time)
if remaining_wait > 0 and cmd_num < count:
await asyncio.sleep(remaining_wait)
compute_phase_summary(f"PHASE 4 Round {round_id}: {desc}",
round_timings, logger)
all_timings.extend(round_timings)
if wedge_in_round:
await wait_for_recovery(host, port, timeout, logger,
label=f"PHASE4-Round{round_id}-recovery")
if round_id != rounds[-1][0] and not _abort_requested:
logger.log(f"PHASE4 Pausing {pause_between}s between rounds...")
for _ in range(pause_between):
if _abort_requested:
break
await asyncio.sleep(1)
# Round D: DRD? stream
if not _abort_requested:
healthy = await health_check(host, port, timeout, logger, label="PHASE4-RoundD")
if healthy:
logger.log(f"\nPHASE4 === Round D: DRD? stream for 60 seconds ===")
stream_result = await stream_drd(host, port, 60.0, timeout, logger=logger)
if stream_result["success"]:
logger.log(f"PHASE4 [RoundD] DRD stream OK - "
f"{stream_result['lines_received']} lines in "
f"{stream_result['total_ms']:.0f}ms, "
f"clean_shutdown={stream_result['clean_shutdown']}")
else:
logger.log(f"PHASE4 [RoundD] DRD stream FAIL - "
f"{stream_result['error_type']}: {stream_result['error']}")
# Health check after stream
logger.log("PHASE4 Checking health after DRD stream...")
await asyncio.sleep(2) # Brief pause
post_stream = await health_check(host, port, timeout, logger,
label="PHASE4-post-DRD")
if not post_stream:
logger.log("PHASE4 *** Device unhealthy after DRD stream ***")
await wait_for_recovery(host, port, timeout, logger,
label="PHASE4-RoundD-recovery")
logger.summary(f"\nPHASE 4 Round D: DRD Stream (60s)")
logger.summary(f" Lines received: {stream_result['lines_received']}")
logger.summary(f" Clean shutdown: {stream_result['clean_shutdown']}")
logger.summary(f" Duration: {stream_result['total_ms']:.0f}ms")
logger.summary(f" Post-stream health: {'OK' if post_stream else 'FAILED'}")
return all_timings
async def phase_5_soak(host: str, port: int, timeout: float,
duration_minutes: int, logger: StressTestLogger) -> list:
"""
Phase 5: Sustained Soak
Simulate real SLMM polling: DOD? every 60s, with extras every 10th poll.
"""
logger.summary(f"\n{'#' * 60}")
logger.summary("### PHASE 5: Sustained Soak ###")
logger.summary(f"### Simulating SLMM polling for {duration_minutes} minutes ###")
logger.summary(f"### DOD? every 60s, +Battery/Measure every 10th poll ###")
logger.summary(f"{'#' * 60}\n")
timings = []
poll_count = 0
phase_start = time.monotonic()
end_time = phase_start + (duration_minutes * 60)
wedge_detected = False
while time.monotonic() < end_time and not _abort_requested:
poll_count += 1
elapsed_min = (time.monotonic() - phase_start) / 60
# Primary poll: DOD?
poll_label = f"PHASE5 POLL#{poll_count:04d}"
r = await send_command(host, port, "DOD?", timeout, is_query=True,
logger=logger, label=poll_label)
status = "OK" if r["success"] else f"FAIL:{r['error_type']}"
logger.log(f"PHASE5 POLL#{poll_count:04d} DOD? {status} "
f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms "
f"elapsed={elapsed_min:.1f}min port=:{r['local_port']} "
f"data_len={r['data_length']}\n"
f" FULL DATA: {r['data']}")
logger.record_timing("phase5", poll_count, "DOD?",
r["connect_ms"], r["response_ms"], r["total_ms"],
r["success"], r.get("error_type", ""),
r.get("result_code", ""), r.get("data_length", 0),
r.get("local_port", 0))
timings.append(r)
if r["error_type"] == "ConnectionRefused":
logger.log(f"PHASE5 *** WEDGE DETECTED at poll #{poll_count} "
f"after {elapsed_min:.1f} minutes ***")
wedge_detected = True
break
# Every 10th poll, also check battery and measurement state
if poll_count % 10 == 0 and r["success"]:
await asyncio.sleep(1.0)
for extra_cmd in ["Battery Level?", "Measure?"]:
if _abort_requested:
break
extra_label = f"PHASE5 POLL#{poll_count:04d} {extra_cmd}"
r2 = await send_command(host, port, extra_cmd, timeout, is_query=True,
logger=logger, label=extra_label)
s2 = "OK" if r2["success"] else f"FAIL:{r2['error_type']}"
logger.log(f"PHASE5 POLL#{poll_count:04d} {extra_cmd} {s2} "
f"conn={r2['connect_ms']:.0f}ms resp={r2['response_ms']:.0f}ms "
f"port=:{r2['local_port']} data_len={r2['data_length']}\n"
f" FULL DATA: {r2['data']}")
logger.record_timing("phase5_extra", poll_count, extra_cmd,
r2["connect_ms"], r2["response_ms"], r2["total_ms"],
r2["success"], r2.get("error_type", ""),
r2.get("result_code", ""), r2.get("data_length", 0),
r2.get("local_port", 0))
timings.append(r2)
if r2["error_type"] == "ConnectionRefused":
logger.log(f"PHASE5 *** WEDGE on extra command {extra_cmd} ***")
wedge_detected = True
break
await asyncio.sleep(1.0)
if wedge_detected:
break
# Wait 60 seconds before next poll
for _ in range(60):
if _abort_requested or time.monotonic() >= end_time:
break
await asyncio.sleep(1)
extra = ""
if wedge_detected:
recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE5")
if recovery["recovered"]:
extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s"
else:
extra = "Device did NOT recover within monitoring window"
elif not _abort_requested:
elapsed_min = (time.monotonic() - phase_start) / 60
extra = f"Device survived full {elapsed_min:.1f} minute soak without wedging"
compute_phase_summary("PHASE 5: Sustained Soak", timings, logger, extra)
return timings
# ─── Dry Run ─────────────────────────────────────────────────────────────────
def dry_run(args):
"""Show what would be tested without connecting to any device."""
print(f"\n{'=' * 60}")
print("DRY RUN - No connections will be made")
print(f"{'=' * 60}")
print(f"\nTarget: {args.host}:{args.port}")
print(f"Timeout: {args.timeout}s")
print(f"Phase(s): {args.phase}")
print(f"Log directory: {args.log_dir}")
print(f"Recovery pause: {args.pause}s")
phases = args.phase if args.phase != "all" else "1,2,3,4,5"
phase_list = [p.strip() for p in phases.split(",")]
total_commands = 0
total_time_est = 0
for p in phase_list:
print(f"\n--- Phase {p} ---")
if p == "1":
n = args.max_commands
t = n * 1 # 1s spacing
print(f" Baseline: up to {n} DOD? commands at 1.0s spacing")
print(f" Estimated time: {t}s ({t / 60:.1f} min)")
total_commands += n
total_time_est += t
elif p == "2":
rates = [1.0, 2.0, 5.0, 10.0, 30.0]
n = args.rate_rounds
for rate in rates:
t = n * rate
print(f" Rate {rate}s: {n} DOD? commands ({t:.0f}s per round)")
pause_time = 4 * args.pause # 4 pauses between 5 rounds
total_commands += n * len(rates)
total_time_est += sum(n * r for r in rates) + pause_time
print(f" + {args.pause}s pause between rounds")
elif p == "3":
cmds = ["DOD?", "Measure?", "Battery Level?", "Clock?", "Sleep Mode?"]
n = 100
print(f" {n} commands each of: {', '.join(cmds)}")
print(f" Estimated: {n * len(cmds)}s + {(len(cmds) - 1) * args.pause}s pauses")
total_commands += n * len(cmds)
total_time_est += n * len(cmds) + (len(cmds) - 1) * args.pause
elif p == "4":
print(" Round A: 50x DOD? with 5s hold = ~250s")
print(" Round B: 50x DOD? close immediately = ~50s")
print(" Round C: 10x connect+30s hold (no command) = ~300s")
print(" Round D: DRD? stream for 60s")
total_commands += 110
total_time_est += 250 + 50 + 300 + 60 + 3 * args.pause
elif p == "5":
dur = args.soak_duration
polls = dur # 1 poll per minute
extras = (polls // 10) * 2 # 2 extra commands every 10th poll
print(f" Soak for {dur} minutes")
print(f" ~{polls} DOD? polls + ~{extras} extra commands")
total_commands += polls + extras
total_time_est += dur * 60
print(f"\n{'=' * 60}")
print(f"Total estimated commands: ~{total_commands}")
print(f"Total estimated time: ~{total_time_est}s ({total_time_est / 60:.0f} min, "
f"{total_time_est / 3600:.1f} hr)")
print(f"{'=' * 60}\n")
# ─── Main ────────────────────────────────────────────────────────────────────
async def run(args):
"""Main test runner."""
# Create run directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = Path(args.log_dir) / f"run_{timestamp}"
logger = StressTestLogger(run_dir)
logger.summary(f"NL-43 TCP Stress Test")
logger.summary(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.summary(f"Target: {args.host}:{args.port}")
logger.summary(f"Timeout: {args.timeout}s")
logger.summary(f"Phase(s): {args.phase}")
# Start packet capture
pcap = None
if not args.no_pcap:
pcap_path = run_dir / "capture.pcap"
pcap = PacketCapture(args.host, args.port, pcap_path)
pcap_started = pcap.start()
if pcap_started:
logger.summary(f"Packet capture: {pcap_path}")
else:
logger.summary("Packet capture: DISABLED (tcpdump unavailable or no permissions)")
pcap = None
# Initial health check
logger.log("\n=== Initial Health Check ===")
healthy = await health_check(args.host, args.port, args.timeout, logger, label="INITIAL")
if not healthy:
logger.log("WARNING: Device is not responding at test start!")
logger.log("Waiting for device to become available...")
recovery = await wait_for_recovery(args.host, args.port, args.timeout,
logger, label="INITIAL")
if not recovery["recovered"]:
logger.summary("ABORTED: Device unreachable at test start and did not recover")
if pcap:
pcap.stop()
logger.close()
return
phases = args.phase if args.phase != "all" else "1,2,3,4,5"
phase_list = [p.strip() for p in phases.split(",")]
for phase_num in phase_list:
if _abort_requested:
break
# Recovery pause between phases
if phase_num != phase_list[0]:
logger.log(f"\n--- Recovery pause ({args.pause}s) before Phase {phase_num} ---")
for _ in range(args.pause):
if _abort_requested:
break
await asyncio.sleep(1)
healthy = await health_check(args.host, args.port, args.timeout, logger,
label=f"pre-phase{phase_num}")
if not healthy:
logger.log(f"Device unhealthy before Phase {phase_num} - waiting for recovery")
recovery = await wait_for_recovery(args.host, args.port, args.timeout,
logger, label=f"pre-phase{phase_num}")
if not recovery["recovered"]:
logger.log(f"Device not recovered - skipping Phase {phase_num} and remaining")
break
if phase_num == "1":
await phase_1_baseline(args.host, args.port, args.timeout,
args.max_commands, logger)
elif phase_num == "2":
await phase_2_rate_variation(args.host, args.port, args.timeout,
args.rate_rounds, args.pause, logger)
elif phase_num == "3":
await phase_3_command_variety(args.host, args.port, args.timeout,
100, args.pause, logger)
elif phase_num == "4":
await phase_4_connection_duration(args.host, args.port, args.timeout,
args.pause, logger)
elif phase_num == "5":
await phase_5_soak(args.host, args.port, args.timeout,
args.soak_duration, logger)
# Stop packet capture
if pcap:
logger.summary("\nStopping packet capture...")
pcap_stats = pcap.stop()
logger.summary(f"Packets captured: {pcap_stats['packets_captured']}")
logger.summary(f"PCAP file: {pcap_stats['pcap_file']}")
logger.summary(f"PCAP size: {pcap_stats['file_size_bytes']} bytes")
# Final summary
logger.summary(f"\nFinished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.summary(f"Logs saved to: {run_dir}")
logger.close()
print(f"\nResults saved to: {run_dir}")
def main():
parser = argparse.ArgumentParser(
description="NL-43 TCP Wedge Stress Test Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Test Phases:
1 Baseline Connection Count - DOD? at 1s until wedge or max
2 Rate Variation - DOD? at 1s, 2s, 5s, 10s, 30s spacing
3 Command Variety - Different commands to see if type matters
4 Connection Duration - Hold connections open various durations
5 Sustained Soak - Simulate real SLMM polling for hours
Examples:
%(prog)s --host 192.168.1.100 --phase 1 --max-commands 500
%(prog)s --host 192.168.1.100 --phase 5 --soak-duration 240
%(prog)s --host 192.168.1.100 --phase all --dry-run
%(prog)s --host 10.0.0.50 --phase 1,2 --timeout 10
"""
)
parser.add_argument("--host", required=True, help="NL-43 IP address")
parser.add_argument("--port", type=int, default=DEFAULT_PORT,
help=f"TCP port (default: {DEFAULT_PORT})")
parser.add_argument("--phase", default="all",
help="Phase(s) to run: 1,2,3,4,5 or 'all' (default: all)")
parser.add_argument("--max-commands", type=int, default=2000,
help="Phase 1: max commands before stopping (default: 2000)")
parser.add_argument("--rate-rounds", type=int, default=50,
help="Phase 2: commands per rate round (default: 50)")
parser.add_argument("--soak-duration", type=int, default=120,
help="Phase 5: soak duration in minutes (default: 120)")
parser.add_argument("--log-dir", default="./stress_test_logs",
help="Log output directory (default: ./stress_test_logs)")
parser.add_argument("--pause", type=int, default=60,
help="Recovery pause between phases in seconds (default: 60)")
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT,
help=f"TCP timeout in seconds (default: {DEFAULT_TIMEOUT})")
parser.add_argument("--dry-run", action="store_true",
help="Show test plan without connecting")
parser.add_argument("--no-pcap", action="store_true",
help="Disable built-in tcpdump packet capture")
args = parser.parse_args()
if args.dry_run:
dry_run(args)
return
print(f"\nNL-43 TCP Stress Test Tool")
print(f"Target: {args.host}:{args.port}")
print(f"Phase(s): {args.phase}")
print(f"Press Ctrl+C at any time to stop safely\n")
asyncio.run(run(args))
if __name__ == "__main__":
main()