#!/usr/bin/env python3 """ NL-43 TCP Wedge Stress Test Tool Standalone diagnostic tool for determining what causes the NL-43 sound level meter's TCP port to become "wedged" (connection refused). Communicates directly with the device over raw TCP sockets - zero SLMM dependencies. Usage: python nl43_stress_test.py --host 192.168.1.100 --port 2255 --phase 1 python nl43_stress_test.py --host 192.168.1.100 --phase all --dry-run Test Phases: 1 - Baseline Connection Count: How many commands before it dies? 2 - Rate Variation: Does spacing between commands matter? 3 - Command Variety: Do certain commands wedge faster? 4 - Connection Duration: Does holding connections open matter? 5 - Sustained Soak: Simulate real SLMM polling over hours """ import asyncio import argparse import csv import os import shutil import signal import subprocess import sys import time from datetime import datetime from pathlib import Path from statistics import mean, median, stdev # ─── Constants ─────────────────────────────────────────────────────────────── DEFAULT_PORT = 2255 DEFAULT_TIMEOUT = 5.0 CRLF = b"\r\n" SUB_BYTE = b"\x1a" # NL-43 result codes RESULT_OK = "R+0000" RESULT_CMD_ERROR = "R+0001" RESULT_PARAM_ERROR = "R+0002" RESULT_SPEC_ERROR = "R+0003" RESULT_STATUS_ERROR = "R+0004" # ─── Global state ──────────────────────────────────────────────────────────── _abort_requested = False def _signal_handler(sig, frame): global _abort_requested _abort_requested = True print("\n\n!!! Ctrl+C detected - finishing current command and writing summary...\n") signal.signal(signal.SIGINT, _signal_handler) # ─── Packet Capture ────────────────────────────────────────────────────────── class PacketCapture: """ Manages a tcpdump subprocess that captures all traffic to/from the NL-43. Saves a .pcap file in the run directory for Wireshark analysis. """ def __init__(self, host: str, port: int, pcap_path: Path): self.host = host self.port = port self.pcap_path = pcap_path self.process: subprocess.Popen | None = None self.available = False def _try_start_tcpdump(self, cmd: list[str], label: str) -> bool: """Attempt to start tcpdump with the given command. Returns True if successful.""" try: self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # Give it a moment to start or fail time.sleep(1.0) if self.process.poll() is not None: # Process exited immediately stderr = self.process.stderr.read().decode(errors="ignore") print(f"[PCAP] {label} tcpdump exited immediately: {stderr.strip()}") self.process = None return False # Process is running - check stderr for warnings (non-blocking read) # tcpdump writes "listening on ..." to stderr on success import select ready, _, _ = select.select([self.process.stderr], [], [], 0.5) if ready: # Read available stderr without blocking import os stderr_bytes = os.read(self.process.stderr.fileno(), 4096) stderr_text = stderr_bytes.decode(errors="ignore") if "listening on" in stderr_text.lower(): print(f"[PCAP] {label} tcpdump started: {stderr_text.strip()}") self.available = True return True elif "permission" in stderr_text.lower() or "operation not permitted" in stderr_text.lower(): print(f"[PCAP] {label} tcpdump permission denied: {stderr_text.strip()}") self.process.terminate() self.process.wait(timeout=3) self.process = None return False else: # Some other output - might be OK, might not print(f"[PCAP] {label} tcpdump stderr: {stderr_text.strip()}") # Process is still running - assume OK self.available = True return True except FileNotFoundError: print(f"[PCAP] {label} tcpdump binary not found") self.process = None return False except Exception as e: print(f"[PCAP] {label} failed to start: {e}") if self.process: try: self.process.kill() self.process.wait(timeout=3) except Exception: pass self.process = None return False def start(self) -> bool: """Start tcpdump capture. Returns True if started successfully.""" if not shutil.which("tcpdump"): print("[PCAP] tcpdump not found in PATH - packet capture disabled") print("[PCAP] Install with: sudo apt install tcpdump") return False # Capture all TCP traffic to/from the device on the specified port # -i any: all interfaces # -nn: no name resolution # -s 0: full packets, no truncation # -w: write raw pcap to file # -U: packet-buffered output (flush after each packet) cmd = [ "tcpdump", "-i", "any", "-nn", "-s", "0", "-U", "-w", str(self.pcap_path), f"host {self.host} and tcp port {self.port}", ] # Try with sudo first (tcpdump almost always needs root for raw capture) print("[PCAP] Starting packet capture (requires sudo for raw interface access)...") if self._try_start_tcpdump(["sudo", "-n"] + cmd, "sudo"): print(f"[PCAP] ✓ Capturing to: {self.pcap_path}") print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}") return True # Fallback: try without sudo (works if user has CAP_NET_RAW capability) print("[PCAP] sudo failed, trying without sudo...") if self._try_start_tcpdump(cmd, "non-sudo"): print(f"[PCAP] ✓ Capturing to: {self.pcap_path}") print(f"[PCAP] Filter: host {self.host} and tcp port {self.port}") return True print("[PCAP] ✗ Could not start packet capture") print("[PCAP] To enable, run the stress test with sudo:") print(f"[PCAP] sudo python3 nl43_stress_test.py --host {self.host} --port {self.port} ...") print("[PCAP] Or run tcpdump separately in another terminal:") print(f"[PCAP] sudo tcpdump -i any -nn -s 0 -U -w capture.pcap host {self.host} and tcp port {self.port}") return False def stop(self) -> dict: """Stop tcpdump and return capture stats.""" stats = { "pcap_file": str(self.pcap_path), "packets_captured": 0, "file_size_bytes": 0, } if not self.process: return stats try: # SIGTERM for graceful shutdown (tcpdump prints stats on exit) self.process.terminate() try: _, stderr = self.process.communicate(timeout=5) stderr_text = stderr.decode(errors="ignore") for line in stderr_text.split("\n"): line = line.strip() if not line: continue if "packets captured" in line: try: stats["packets_captured"] = int(line.split()[0]) except (ValueError, IndexError): pass # Print all tcpdump stats lines if any(k in line for k in ["captured", "received", "dropped"]): print(f"[PCAP] {line}") except subprocess.TimeoutExpired: self.process.kill() self.process.wait() except Exception as e: print(f"[PCAP] Error stopping tcpdump: {e}") # Report file size if self.pcap_path.exists(): stats["file_size_bytes"] = self.pcap_path.stat().st_size size_mb = stats["file_size_bytes"] / (1024 * 1024) print(f"[PCAP] Saved: {self.pcap_path} ({size_mb:.2f} MB)") print(f"[PCAP] Open with: wireshark {self.pcap_path}") print(f"[PCAP] or: tcpdump -nn -r {self.pcap_path}") else: print(f"[PCAP] Warning: pcap file not found at {self.pcap_path}") self.process = None return stats # ─── Logger ────────────────────────────────────────────────────────────────── class StressTestLogger: """Dual-output logger: console + file, with CSV data writers.""" def __init__(self, run_dir: Path): self.run_dir = run_dir self.run_dir.mkdir(parents=True, exist_ok=True) self.log_file = open(run_dir / "full_log.txt", "w", buffering=1) self.summary_file = open(run_dir / "summary.txt", "w", buffering=1) # Raw wire log - every byte sent/received in hex + ascii self.wire_log = open(run_dir / "wire_log.txt", "w", buffering=1) # CSV for timing data self.timing_csv_path = run_dir / "timing_data.csv" self.timing_csv = open(self.timing_csv_path, "w", newline="") self.timing_writer = csv.writer(self.timing_csv) self.timing_writer.writerow([ "phase", "cmd_num", "command", "connect_ms", "response_ms", "total_ms", "success", "error_type", "result_code", "data_length", "local_port", "timestamp" ]) def log(self, message: str, also_print: bool = True): ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] line = f"[{ts}] {message}" self.log_file.write(line + "\n") if also_print: print(line) def wire(self, direction: str, raw_bytes: bytes, label: str = ""): """Log raw wire data - every byte in hex and ascii representation.""" ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] hex_str = raw_bytes.hex(" ") ascii_str = raw_bytes.decode("ascii", errors="replace") # Show control characters explicitly readable = "" for b in raw_bytes: if b == 0x0d: readable += "\\r" elif b == 0x0a: readable += "\\n" elif b == 0x1a: readable += "\\x1a[SUB]" elif b == 0x24: readable += "$" elif 32 <= b < 127: readable += chr(b) else: readable += f"\\x{b:02x}" self.wire_log.write(f"[{ts}] {label} {direction} ({len(raw_bytes)} bytes)\n") self.wire_log.write(f" HEX: {hex_str}\n") self.wire_log.write(f" ASCII: {readable}\n") self.wire_log.flush() def summary(self, message: str): self.summary_file.write(message + "\n") self.log_file.write(message + "\n") print(message) def record_timing(self, phase: str, cmd_num: int, command: str, connect_ms: float, response_ms: float, total_ms: float, success: bool, error_type: str = "", result_code: str = "", data_length: int = 0, local_port: int = 0): self.timing_writer.writerow([ phase, cmd_num, command, f"{connect_ms:.1f}", f"{response_ms:.1f}", f"{total_ms:.1f}", success, error_type, result_code, data_length, local_port, datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] ]) self.timing_csv.flush() def close(self): self.log_file.close() self.summary_file.close() self.timing_csv.close() self.wire_log.close() # ─── Raw TCP Communication ────────────────────────────────────────────────── async def send_command(host: str, port: int, command: str, timeout: float, is_query: bool = True, hold_seconds: float = 0, logger: "StressTestLogger | None" = None, label: str = "") -> dict: """ Send a single command to the NL-43 over a fresh TCP connection. Returns dict with timing and result info: connect_ms, response_ms, total_ms, success, result_code, data, data_length, raw_result_bytes, raw_data_bytes, local_port, error, error_type """ result = { "connect_ms": 0.0, "response_ms": 0.0, "total_ms": 0.0, "success": False, "result_code": None, "data": None, "data_length": 0, "raw_result_bytes": None, "raw_data_bytes": None, "local_port": 0, "error": None, "error_type": None, } t_start = time.monotonic() reader = None writer = None try: # Connect t_conn_start = time.monotonic() reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=timeout ) result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000 # Grab local socket info (source IP:port) sockname = writer.get_extra_info("sockname") if sockname: result["local_port"] = sockname[1] if logger: logger.log(f"{label} CONNECT {host}:{port} from :{result['local_port']} " f"in {result['connect_ms']:.1f}ms", also_print=False) # Send command cmd_bytes = command.encode("ascii") if not cmd_bytes.endswith(CRLF): cmd_bytes += CRLF writer.write(cmd_bytes) await writer.drain() if logger: logger.wire("SEND>>>", cmd_bytes, label) # Read result code (first line) t_resp_start = time.monotonic() first_line_raw = await asyncio.wait_for( reader.readuntil(b"\n"), timeout=timeout ) result["raw_result_bytes"] = first_line_raw if logger: logger.wire("<< 0: if logger: logger.log(f"{label} HOLDING connection open for {hold_seconds}s...", also_print=False) await asyncio.sleep(hold_seconds) result["success"] = result_code == RESULT_OK if result_code != RESULT_OK: result["error"] = f"Device returned {result_code}" result["error_type"] = "DeviceError" except ConnectionRefusedError: result["error"] = "Connection refused" result["error_type"] = "ConnectionRefused" if logger: logger.log(f"{label} CONNECTION REFUSED by {host}:{port}", also_print=False) except asyncio.TimeoutError: result["error"] = f"Timeout after {timeout}s" result["error_type"] = "Timeout" if logger: logger.log(f"{label} TIMEOUT after {timeout}s connecting to {host}:{port}", also_print=False) except ConnectionResetError as e: result["error"] = f"Connection reset: {e}" result["error_type"] = "ConnectionReset" if logger: logger.log(f"{label} CONNECTION RESET by {host}:{port}: {e}", also_print=False) except OSError as e: result["error"] = f"{type(e).__name__}: {e}" result["error_type"] = type(e).__name__ if logger: logger.log(f"{label} OS ERROR: {type(e).__name__}: {e}", also_print=False) except Exception as e: result["error"] = f"{type(e).__name__}: {e}" result["error_type"] = type(e).__name__ if logger: logger.log(f"{label} UNEXPECTED ERROR: {type(e).__name__}: {e}", also_print=False) finally: if writer: try: writer.close() await writer.wait_closed() except Exception: pass result["total_ms"] = (time.monotonic() - t_start) * 1000 if logger: logger.log(f"{label} CLOSE total={result['total_ms']:.1f}ms " f"local_port={result['local_port']}", also_print=False) return result async def stream_drd(host: str, port: int, duration_seconds: float, timeout: float, logger: "StressTestLogger | None" = None) -> dict: """ Open a DRD? streaming connection and hold it for the specified duration. Returns dict with timing and streaming stats. """ result = { "connect_ms": 0.0, "total_ms": 0.0, "success": False, "lines_received": 0, "error": None, "error_type": None, "clean_shutdown": False, } t_start = time.monotonic() writer = None try: t_conn_start = time.monotonic() reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=timeout ) result["connect_ms"] = (time.monotonic() - t_conn_start) * 1000 # Grab local port sockname = writer.get_extra_info("sockname") local_port = sockname[1] if sockname else 0 result["local_port"] = local_port if logger: logger.log(f"DRD CONNECT :{local_port} → {host}:{port} " f"in {result['connect_ms']:.1f}ms", also_print=False) # Send DRD? cmd_bytes = b"DRD?\r\n" writer.write(cmd_bytes) await writer.drain() if logger: logger.wire("SEND>>>", cmd_bytes, "DRD") # Read result code first_line = await asyncio.wait_for( reader.readuntil(b"\n"), timeout=timeout ) if logger: logger.wire("<<>>", SUB_BYTE, "DRD shutdown") except Exception: pass result["success"] = True except ConnectionRefusedError: result["error"] = "Connection refused" result["error_type"] = "ConnectionRefused" except asyncio.TimeoutError: result["error"] = f"Connect timeout after {timeout}s" result["error_type"] = "Timeout" except Exception as e: result["error"] = str(e) result["error_type"] = type(e).__name__ finally: if writer: try: writer.close() await writer.wait_closed() except Exception: pass result["total_ms"] = (time.monotonic() - t_start) * 1000 return result # ─── Health Check ──────────────────────────────────────────────────────────── async def health_check(host: str, port: int, timeout: float, logger: StressTestLogger, label: str = "") -> bool: """ Quick connectivity test. Returns True if device responds to DOD?. """ prefix = f"HEALTH [{label}]" if label else "HEALTH" logger.log(f"{prefix} Checking device at {host}:{port}...") r = await send_command(host, port, "DOD?", timeout, is_query=True, logger=logger, label=prefix) if r["success"]: logger.log(f"{prefix} OK - connect={r['connect_ms']:.0f}ms " f"response={r['response_ms']:.0f}ms " f"local_port={r['local_port']} " f"data_len={r['data_length']}") return True else: logger.log(f"{prefix} FAIL - {r['error_type']}: {r['error']}") return False async def wait_for_recovery(host: str, port: int, timeout: float, logger: StressTestLogger, max_wait: int = 300, label: str = "") -> dict: """ Wait for the device to recover after a wedge. Tries every 10 seconds for up to max_wait seconds. Returns dict with recovery info. """ prefix = f"RECOVERY [{label}]" if label else "RECOVERY" logger.log(f"{prefix} Device unresponsive. Monitoring for recovery (max {max_wait}s)...") t_start = time.monotonic() attempts = 0 while (time.monotonic() - t_start) < max_wait and not _abort_requested: attempts += 1 elapsed = time.monotonic() - t_start logger.log(f"{prefix} Attempt #{attempts} at {elapsed:.0f}s...") r = await send_command(host, port, "DOD?", timeout, is_query=True, logger=logger, label=f"{prefix} attempt#{attempts}") if r["success"]: recovery_time = time.monotonic() - t_start logger.log(f"{prefix} *** DEVICE RECOVERED after {recovery_time:.1f}s ({attempts} attempts) ***") return { "recovered": True, "recovery_seconds": recovery_time, "attempts": attempts, } logger.log(f"{prefix} Still down: {r['error_type']}: {r['error']}") await asyncio.sleep(10) total_time = time.monotonic() - t_start logger.log(f"{prefix} Device did NOT recover within {total_time:.0f}s ({attempts} attempts)") return { "recovered": False, "recovery_seconds": total_time, "attempts": attempts, } # ─── Phase Summaries ──────────────────────────────────────────────────────── def compute_phase_summary(phase_name: str, timings: list, logger: StressTestLogger, extra_info: str = ""): """Compute and log summary statistics for a phase.""" total = len(timings) successes = [t for t in timings if t["success"]] failures = [t for t in timings if not t["success"]] logger.summary(f"\n{'=' * 60}") logger.summary(f"=== {phase_name} SUMMARY ===") logger.summary(f"{'=' * 60}") logger.summary(f"Total commands sent: {total}") logger.summary(f"Total successful: {len(successes)}") logger.summary(f"Total failed: {len(failures)}") if failures: error_types = {} for f in failures: et = f.get("error_type", "Unknown") error_types[et] = error_types.get(et, 0) + 1 for et, count in error_types.items(): logger.summary(f" {et}: {count}") if successes: connect_times = [t["connect_ms"] for t in successes] response_times = [t["response_ms"] for t in successes] total_times = [t["total_ms"] for t in successes] logger.summary(f"\nConnect time (ms): avg={mean(connect_times):.1f} " f"med={median(connect_times):.1f} " f"min={min(connect_times):.1f} max={max(connect_times):.1f}") logger.summary(f"Response time (ms): avg={mean(response_times):.1f} " f"med={median(response_times):.1f} " f"min={min(response_times):.1f} max={max(response_times):.1f}") logger.summary(f"Total time (ms): avg={mean(total_times):.1f} " f"med={median(total_times):.1f} " f"min={min(total_times):.1f} max={max(total_times):.1f}") if len(connect_times) > 1: logger.summary(f"Connect stdev (ms): {stdev(connect_times):.1f}") logger.summary(f"Response stdev (ms): {stdev(response_times):.1f}") # Trend analysis - compare first 20% vs last 20% if len(response_times) >= 10: bucket = max(1, len(response_times) // 5) first_bucket = response_times[:bucket] last_bucket = response_times[-bucket:] first_avg = mean(first_bucket) last_avg = mean(last_bucket) change_pct = ((last_avg - first_avg) / first_avg) * 100 if first_avg > 0 else 0 if change_pct > 20: trend = "INCREASING" elif change_pct < -20: trend = "DECREASING" else: trend = "STABLE" logger.summary(f"\nResponse time trend: {trend} " f"(first {bucket}: {first_avg:.1f}ms → last {bucket}: {last_avg:.1f}ms, " f"{change_pct:+.1f}%)") if failures: # Time/command number of first failure first_fail = failures[0] fail_idx = timings.index(first_fail) elapsed_at_fail = sum(t["total_ms"] for t in timings[:fail_idx + 1]) / 1000 logger.summary(f"\nFirst failure at: command #{fail_idx + 1} " f"after {elapsed_at_fail:.1f}s ({elapsed_at_fail / 60:.1f} min)") logger.summary(f"Failure type: {first_fail['error_type']}: {first_fail['error']}") if extra_info: logger.summary(f"\n{extra_info}") logger.summary(f"{'=' * 60}\n") # ─── Test Phases ───────────────────────────────────────────────────────────── async def phase_1_baseline(host: str, port: int, timeout: float, max_commands: int, logger: StressTestLogger) -> list: """ Phase 1: Baseline Connection Count How many DOD? commands at 1-second spacing before the device wedges? """ logger.summary(f"\n{'#' * 60}") logger.summary("### PHASE 1: Baseline Connection Count ###") logger.summary(f"### Sending up to {max_commands} DOD? commands at 1.0s spacing ###") logger.summary(f"{'#' * 60}\n") timings = [] wedge_detected = False phase_start = time.monotonic() for i in range(max_commands): if _abort_requested: logger.log("PHASE1 Abort requested by user") break cmd_num = i + 1 elapsed_total = time.monotonic() - phase_start cmd_label = f"PHASE1 CMD#{cmd_num:04d}" r = await send_command(host, port, "DOD?", timeout, is_query=True, logger=logger, label=cmd_label) # Log every command with FULL data if r["success"]: logger.log(f"PHASE1 CMD#{cmd_num:04d} OK " f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " f"total={r['total_ms']:.0f}ms elapsed={elapsed_total:.1f}s " f"port=:{r['local_port']} code={r['result_code']} " f"data_len={r['data_length']}\n" f" FULL DATA: {r['data']}") else: logger.log(f"PHASE1 CMD#{cmd_num:04d} FAIL " f"{r['error_type']}: {r['error']} " f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms " f"elapsed={elapsed_total:.1f}s port=:{r['local_port']}") logger.record_timing("phase1", cmd_num, "DOD?", r["connect_ms"], r["response_ms"], r["total_ms"], r["success"], r.get("error_type", ""), r.get("result_code", ""), r.get("data_length", 0), r.get("local_port", 0)) timings.append(r) # Check for wedge if r["error_type"] == "ConnectionRefused": logger.log(f"PHASE1 *** WEDGE DETECTED at command #{cmd_num} " f"after {elapsed_total:.1f}s ({elapsed_total / 60:.1f} min) ***") wedge_detected = True break # Wait 1 second before next command if cmd_num < max_commands: await asyncio.sleep(1.0) extra = "" if not wedge_detected and not _abort_requested: extra = f"Device survived all {max_commands} commands without wedging." elif wedge_detected: recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE1") if recovery["recovered"]: extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s" else: extra = "Device did NOT recover within monitoring window" compute_phase_summary("PHASE 1: Baseline Connection Count", timings, logger, extra) return timings async def phase_2_rate_variation(host: str, port: int, timeout: float, commands_per_round: int, pause_between: int, logger: StressTestLogger) -> list: """ Phase 2: Rate Variation Does the spacing between commands affect when/if the device wedges? """ rates = [1.0, 2.0, 5.0, 10.0, 30.0] logger.summary(f"\n{'#' * 60}") logger.summary("### PHASE 2: Rate Variation ###") logger.summary(f"### {commands_per_round} commands per round at rates: {rates}s ###") logger.summary(f"{'#' * 60}\n") all_timings = [] for rate in rates: if _abort_requested: break # Health check before each round logger.log(f"PHASE2 Checking device health before {rate}s rate round...") healthy = await health_check(host, port, timeout, logger, label=f"PHASE2-{rate}s") if not healthy: logger.log(f"PHASE2 Device unhealthy before {rate}s round - attempting recovery") recovery = await wait_for_recovery(host, port, timeout, logger, label=f"PHASE2-{rate}s") if not recovery["recovered"]: logger.log(f"PHASE2 Device not recovered - skipping remaining rates") break logger.log(f"\nPHASE2 === Starting {rate}s rate round ({commands_per_round} commands) ===") round_timings = [] wedge_in_round = False for i in range(commands_per_round): if _abort_requested: break cmd_num = i + 1 cmd_label = f"PHASE2 [{rate}s] CMD#{cmd_num:03d}" r = await send_command(host, port, "DOD?", timeout, is_query=True, logger=logger, label=cmd_label) status = "OK" if r["success"] else f"FAIL:{r['error_type']}" logger.log(f"PHASE2 [{rate}s] CMD#{cmd_num:03d} {status} " f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " f"total={r['total_ms']:.0f}ms port=:{r['local_port']} " f"data_len={r['data_length']}\n" f" FULL DATA: {r['data']}") logger.record_timing(f"phase2_{rate}s", cmd_num, "DOD?", r["connect_ms"], r["response_ms"], r["total_ms"], r["success"], r.get("error_type", ""), r.get("result_code", ""), r.get("data_length", 0), r.get("local_port", 0)) round_timings.append(r) if r["error_type"] == "ConnectionRefused": logger.log(f"PHASE2 *** WEDGE at {rate}s rate, command #{cmd_num} ***") wedge_in_round = True break if cmd_num < commands_per_round: await asyncio.sleep(rate) compute_phase_summary(f"PHASE 2: Rate {rate}s ({commands_per_round} commands)", round_timings, logger) all_timings.extend(round_timings) if wedge_in_round: recovery = await wait_for_recovery(host, port, timeout, logger, label=f"PHASE2-{rate}s-recovery") # Pause between rounds if rate != rates[-1] and not _abort_requested: logger.log(f"PHASE2 Pausing {pause_between}s between rounds...") for _ in range(pause_between): if _abort_requested: break await asyncio.sleep(1) return all_timings async def phase_3_command_variety(host: str, port: int, timeout: float, commands_per_type: int, pause_between: int, logger: StressTestLogger) -> list: """ Phase 3: Command Variety Do certain commands cause the device to wedge faster? """ test_commands = [ ("DOD?", True), ("Measure?", True), ("Battery Level?", True), ("Clock?", True), ("Sleep Mode?", True), ] logger.summary(f"\n{'#' * 60}") logger.summary("### PHASE 3: Command Variety ###") logger.summary(f"### {commands_per_type} commands of each type at 1.0s spacing ###") logger.summary(f"{'#' * 60}\n") all_timings = [] for cmd, is_query in test_commands: if _abort_requested: break # Health check healthy = await health_check(host, port, timeout, logger, label=f"PHASE3-{cmd}") if not healthy: recovery = await wait_for_recovery(host, port, timeout, logger, label=f"PHASE3-{cmd}") if not recovery["recovered"]: logger.log(f"PHASE3 Device not recovered - skipping remaining commands") break logger.log(f"\nPHASE3 === Testing command: {cmd} ({commands_per_type} times) ===") round_timings = [] wedge_in_round = False for i in range(commands_per_type): if _abort_requested: break cmd_num = i + 1 cmd_label = f"PHASE3 [{cmd}] CMD#{cmd_num:03d}" r = await send_command(host, port, cmd, timeout, is_query=is_query, logger=logger, label=cmd_label) status = "OK" if r["success"] else f"FAIL:{r['error_type']}" logger.log(f"PHASE3 [{cmd}] CMD#{cmd_num:03d} {status} " f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " f"port=:{r['local_port']} data_len={r['data_length']}\n" f" FULL DATA: {r['data']}") logger.record_timing("phase3", cmd_num, cmd, r["connect_ms"], r["response_ms"], r["total_ms"], r["success"], r.get("error_type", ""), r.get("result_code", ""), r.get("data_length", 0), r.get("local_port", 0)) round_timings.append(r) if r["error_type"] == "ConnectionRefused": logger.log(f"PHASE3 *** WEDGE on {cmd}, command #{cmd_num} ***") wedge_in_round = True break if cmd_num < commands_per_type: await asyncio.sleep(1.0) compute_phase_summary(f"PHASE 3: Command '{cmd}' ({commands_per_type} cmds)", round_timings, logger) all_timings.extend(round_timings) if wedge_in_round: recovery = await wait_for_recovery(host, port, timeout, logger, label=f"PHASE3-{cmd}-recovery") # Pause between command types if cmd != test_commands[-1][0] and not _abort_requested: logger.log(f"PHASE3 Pausing {pause_between}s between command types...") for _ in range(pause_between): if _abort_requested: break await asyncio.sleep(1) return all_timings async def phase_4_connection_duration(host: str, port: int, timeout: float, pause_between: int, logger: StressTestLogger) -> list: """ Phase 4: Connection Duration Does holding connections open longer or shorter affect stability? """ logger.summary(f"\n{'#' * 60}") logger.summary("### PHASE 4: Connection Duration ###") logger.summary("### Testing various connection hold times ###") logger.summary(f"{'#' * 60}\n") all_timings = [] # Round A: Hold connection open 5s after command rounds = [ ("A", "Hold 5s after response", 50, 5.0, False), ("B", "Close immediately", 50, 0.0, False), ("C", "Hold 30s (no command)", 10, 30.0, True), ] for round_id, desc, count, hold_time, hold_only in rounds: if _abort_requested: break healthy = await health_check(host, port, timeout, logger, label=f"PHASE4-Round{round_id}") if not healthy: recovery = await wait_for_recovery(host, port, timeout, logger, label=f"PHASE4-Round{round_id}") if not recovery["recovered"]: logger.log(f"PHASE4 Device not recovered - skipping remaining rounds") break logger.log(f"\nPHASE4 === Round {round_id}: {desc} ({count} connections) ===") round_timings = [] wedge_in_round = False for i in range(count): if _abort_requested: break cmd_num = i + 1 cmd_label = f"PHASE4 [Round{round_id}] #{cmd_num:03d}" if hold_only: # Just open connection, hold, close (no command) r = {"connect_ms": 0, "response_ms": 0, "total_ms": 0, "success": False, "error": None, "error_type": None, "local_port": 0, "data": None, "data_length": 0, "result_code": ""} t_start = time.monotonic() writer = None try: t_conn = time.monotonic() reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=timeout ) r["connect_ms"] = (time.monotonic() - t_conn) * 1000 sockname = writer.get_extra_info("sockname") if sockname: r["local_port"] = sockname[1] logger.log(f"{cmd_label} CONNECT :{r['local_port']} → {host}:{port} " f"in {r['connect_ms']:.1f}ms, holding {hold_time}s (no command)...", also_print=False) # Hold without sending await asyncio.sleep(hold_time) r["success"] = True except ConnectionRefusedError: r["error"] = "Connection refused" r["error_type"] = "ConnectionRefused" logger.log(f"{cmd_label} CONNECTION REFUSED", also_print=False) except asyncio.TimeoutError: r["error"] = "Timeout" r["error_type"] = "Timeout" except ConnectionResetError as e: r["error"] = f"Connection reset: {e}" r["error_type"] = "ConnectionReset" except Exception as e: r["error"] = str(e) r["error_type"] = type(e).__name__ finally: if writer: try: writer.close() await writer.wait_closed() except Exception: pass r["total_ms"] = (time.monotonic() - t_start) * 1000 else: r = await send_command(host, port, "DOD?", timeout, is_query=True, hold_seconds=hold_time, logger=logger, label=cmd_label) status = "OK" if r["success"] else f"FAIL:{r.get('error_type', '?')}" cmd_desc = f"DOD?(hold={hold_time}s)" if not hold_only else f"connect_only(hold={hold_time}s)" logger.log(f"PHASE4 [Round{round_id}] #{cmd_num:03d} {status} " f"conn={r['connect_ms']:.0f}ms total={r['total_ms']:.0f}ms " f"port=:{r.get('local_port', 0)}") logger.record_timing(f"phase4_{round_id}", cmd_num, cmd_desc, r["connect_ms"], r.get("response_ms", 0), r["total_ms"], r["success"], r.get("error_type", ""), r.get("result_code", ""), r.get("data_length", 0), r.get("local_port", 0)) round_timings.append(r) if r.get("error_type") == "ConnectionRefused": logger.log(f"PHASE4 *** WEDGE in Round {round_id}, connection #{cmd_num} ***") wedge_in_round = True break # 1s spacing between connections (plus any hold time already elapsed) remaining_wait = max(0, 1.0 - hold_time) if remaining_wait > 0 and cmd_num < count: await asyncio.sleep(remaining_wait) compute_phase_summary(f"PHASE 4 Round {round_id}: {desc}", round_timings, logger) all_timings.extend(round_timings) if wedge_in_round: await wait_for_recovery(host, port, timeout, logger, label=f"PHASE4-Round{round_id}-recovery") if round_id != rounds[-1][0] and not _abort_requested: logger.log(f"PHASE4 Pausing {pause_between}s between rounds...") for _ in range(pause_between): if _abort_requested: break await asyncio.sleep(1) # Round D: DRD? stream if not _abort_requested: healthy = await health_check(host, port, timeout, logger, label="PHASE4-RoundD") if healthy: logger.log(f"\nPHASE4 === Round D: DRD? stream for 60 seconds ===") stream_result = await stream_drd(host, port, 60.0, timeout, logger=logger) if stream_result["success"]: logger.log(f"PHASE4 [RoundD] DRD stream OK - " f"{stream_result['lines_received']} lines in " f"{stream_result['total_ms']:.0f}ms, " f"clean_shutdown={stream_result['clean_shutdown']}") else: logger.log(f"PHASE4 [RoundD] DRD stream FAIL - " f"{stream_result['error_type']}: {stream_result['error']}") # Health check after stream logger.log("PHASE4 Checking health after DRD stream...") await asyncio.sleep(2) # Brief pause post_stream = await health_check(host, port, timeout, logger, label="PHASE4-post-DRD") if not post_stream: logger.log("PHASE4 *** Device unhealthy after DRD stream ***") await wait_for_recovery(host, port, timeout, logger, label="PHASE4-RoundD-recovery") logger.summary(f"\nPHASE 4 Round D: DRD Stream (60s)") logger.summary(f" Lines received: {stream_result['lines_received']}") logger.summary(f" Clean shutdown: {stream_result['clean_shutdown']}") logger.summary(f" Duration: {stream_result['total_ms']:.0f}ms") logger.summary(f" Post-stream health: {'OK' if post_stream else 'FAILED'}") return all_timings async def phase_5_soak(host: str, port: int, timeout: float, duration_minutes: int, logger: StressTestLogger) -> list: """ Phase 5: Sustained Soak Simulate real SLMM polling: DOD? every 60s, with extras every 10th poll. """ logger.summary(f"\n{'#' * 60}") logger.summary("### PHASE 5: Sustained Soak ###") logger.summary(f"### Simulating SLMM polling for {duration_minutes} minutes ###") logger.summary(f"### DOD? every 60s, +Battery/Measure every 10th poll ###") logger.summary(f"{'#' * 60}\n") timings = [] poll_count = 0 phase_start = time.monotonic() end_time = phase_start + (duration_minutes * 60) wedge_detected = False while time.monotonic() < end_time and not _abort_requested: poll_count += 1 elapsed_min = (time.monotonic() - phase_start) / 60 # Primary poll: DOD? poll_label = f"PHASE5 POLL#{poll_count:04d}" r = await send_command(host, port, "DOD?", timeout, is_query=True, logger=logger, label=poll_label) status = "OK" if r["success"] else f"FAIL:{r['error_type']}" logger.log(f"PHASE5 POLL#{poll_count:04d} DOD? {status} " f"conn={r['connect_ms']:.0f}ms resp={r['response_ms']:.0f}ms " f"elapsed={elapsed_min:.1f}min port=:{r['local_port']} " f"data_len={r['data_length']}\n" f" FULL DATA: {r['data']}") logger.record_timing("phase5", poll_count, "DOD?", r["connect_ms"], r["response_ms"], r["total_ms"], r["success"], r.get("error_type", ""), r.get("result_code", ""), r.get("data_length", 0), r.get("local_port", 0)) timings.append(r) if r["error_type"] == "ConnectionRefused": logger.log(f"PHASE5 *** WEDGE DETECTED at poll #{poll_count} " f"after {elapsed_min:.1f} minutes ***") wedge_detected = True break # Every 10th poll, also check battery and measurement state if poll_count % 10 == 0 and r["success"]: await asyncio.sleep(1.0) for extra_cmd in ["Battery Level?", "Measure?"]: if _abort_requested: break extra_label = f"PHASE5 POLL#{poll_count:04d} {extra_cmd}" r2 = await send_command(host, port, extra_cmd, timeout, is_query=True, logger=logger, label=extra_label) s2 = "OK" if r2["success"] else f"FAIL:{r2['error_type']}" logger.log(f"PHASE5 POLL#{poll_count:04d} {extra_cmd} {s2} " f"conn={r2['connect_ms']:.0f}ms resp={r2['response_ms']:.0f}ms " f"port=:{r2['local_port']} data_len={r2['data_length']}\n" f" FULL DATA: {r2['data']}") logger.record_timing("phase5_extra", poll_count, extra_cmd, r2["connect_ms"], r2["response_ms"], r2["total_ms"], r2["success"], r2.get("error_type", ""), r2.get("result_code", ""), r2.get("data_length", 0), r2.get("local_port", 0)) timings.append(r2) if r2["error_type"] == "ConnectionRefused": logger.log(f"PHASE5 *** WEDGE on extra command {extra_cmd} ***") wedge_detected = True break await asyncio.sleep(1.0) if wedge_detected: break # Wait 60 seconds before next poll for _ in range(60): if _abort_requested or time.monotonic() >= end_time: break await asyncio.sleep(1) extra = "" if wedge_detected: recovery = await wait_for_recovery(host, port, timeout, logger, label="PHASE5") if recovery["recovered"]: extra = f"Device recovered after {recovery['recovery_seconds']:.1f}s" else: extra = "Device did NOT recover within monitoring window" elif not _abort_requested: elapsed_min = (time.monotonic() - phase_start) / 60 extra = f"Device survived full {elapsed_min:.1f} minute soak without wedging" compute_phase_summary("PHASE 5: Sustained Soak", timings, logger, extra) return timings # ─── Dry Run ───────────────────────────────────────────────────────────────── def dry_run(args): """Show what would be tested without connecting to any device.""" print(f"\n{'=' * 60}") print("DRY RUN - No connections will be made") print(f"{'=' * 60}") print(f"\nTarget: {args.host}:{args.port}") print(f"Timeout: {args.timeout}s") print(f"Phase(s): {args.phase}") print(f"Log directory: {args.log_dir}") print(f"Recovery pause: {args.pause}s") phases = args.phase if args.phase != "all" else "1,2,3,4,5" phase_list = [p.strip() for p in phases.split(",")] total_commands = 0 total_time_est = 0 for p in phase_list: print(f"\n--- Phase {p} ---") if p == "1": n = args.max_commands t = n * 1 # 1s spacing print(f" Baseline: up to {n} DOD? commands at 1.0s spacing") print(f" Estimated time: {t}s ({t / 60:.1f} min)") total_commands += n total_time_est += t elif p == "2": rates = [1.0, 2.0, 5.0, 10.0, 30.0] n = args.rate_rounds for rate in rates: t = n * rate print(f" Rate {rate}s: {n} DOD? commands ({t:.0f}s per round)") pause_time = 4 * args.pause # 4 pauses between 5 rounds total_commands += n * len(rates) total_time_est += sum(n * r for r in rates) + pause_time print(f" + {args.pause}s pause between rounds") elif p == "3": cmds = ["DOD?", "Measure?", "Battery Level?", "Clock?", "Sleep Mode?"] n = 100 print(f" {n} commands each of: {', '.join(cmds)}") print(f" Estimated: {n * len(cmds)}s + {(len(cmds) - 1) * args.pause}s pauses") total_commands += n * len(cmds) total_time_est += n * len(cmds) + (len(cmds) - 1) * args.pause elif p == "4": print(" Round A: 50x DOD? with 5s hold = ~250s") print(" Round B: 50x DOD? close immediately = ~50s") print(" Round C: 10x connect+30s hold (no command) = ~300s") print(" Round D: DRD? stream for 60s") total_commands += 110 total_time_est += 250 + 50 + 300 + 60 + 3 * args.pause elif p == "5": dur = args.soak_duration polls = dur # 1 poll per minute extras = (polls // 10) * 2 # 2 extra commands every 10th poll print(f" Soak for {dur} minutes") print(f" ~{polls} DOD? polls + ~{extras} extra commands") total_commands += polls + extras total_time_est += dur * 60 print(f"\n{'=' * 60}") print(f"Total estimated commands: ~{total_commands}") print(f"Total estimated time: ~{total_time_est}s ({total_time_est / 60:.0f} min, " f"{total_time_est / 3600:.1f} hr)") print(f"{'=' * 60}\n") # ─── Main ──────────────────────────────────────────────────────────────────── async def run(args): """Main test runner.""" # Create run directory timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_dir = Path(args.log_dir) / f"run_{timestamp}" logger = StressTestLogger(run_dir) logger.summary(f"NL-43 TCP Stress Test") logger.summary(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.summary(f"Target: {args.host}:{args.port}") logger.summary(f"Timeout: {args.timeout}s") logger.summary(f"Phase(s): {args.phase}") # Start packet capture pcap = None if not args.no_pcap: pcap_path = run_dir / "capture.pcap" pcap = PacketCapture(args.host, args.port, pcap_path) pcap_started = pcap.start() if pcap_started: logger.summary(f"Packet capture: {pcap_path}") else: logger.summary("Packet capture: DISABLED (tcpdump unavailable or no permissions)") pcap = None # Initial health check logger.log("\n=== Initial Health Check ===") healthy = await health_check(args.host, args.port, args.timeout, logger, label="INITIAL") if not healthy: logger.log("WARNING: Device is not responding at test start!") logger.log("Waiting for device to become available...") recovery = await wait_for_recovery(args.host, args.port, args.timeout, logger, label="INITIAL") if not recovery["recovered"]: logger.summary("ABORTED: Device unreachable at test start and did not recover") if pcap: pcap.stop() logger.close() return phases = args.phase if args.phase != "all" else "1,2,3,4,5" phase_list = [p.strip() for p in phases.split(",")] for phase_num in phase_list: if _abort_requested: break # Recovery pause between phases if phase_num != phase_list[0]: logger.log(f"\n--- Recovery pause ({args.pause}s) before Phase {phase_num} ---") for _ in range(args.pause): if _abort_requested: break await asyncio.sleep(1) healthy = await health_check(args.host, args.port, args.timeout, logger, label=f"pre-phase{phase_num}") if not healthy: logger.log(f"Device unhealthy before Phase {phase_num} - waiting for recovery") recovery = await wait_for_recovery(args.host, args.port, args.timeout, logger, label=f"pre-phase{phase_num}") if not recovery["recovered"]: logger.log(f"Device not recovered - skipping Phase {phase_num} and remaining") break if phase_num == "1": await phase_1_baseline(args.host, args.port, args.timeout, args.max_commands, logger) elif phase_num == "2": await phase_2_rate_variation(args.host, args.port, args.timeout, args.rate_rounds, args.pause, logger) elif phase_num == "3": await phase_3_command_variety(args.host, args.port, args.timeout, 100, args.pause, logger) elif phase_num == "4": await phase_4_connection_duration(args.host, args.port, args.timeout, args.pause, logger) elif phase_num == "5": await phase_5_soak(args.host, args.port, args.timeout, args.soak_duration, logger) # Stop packet capture if pcap: logger.summary("\nStopping packet capture...") pcap_stats = pcap.stop() logger.summary(f"Packets captured: {pcap_stats['packets_captured']}") logger.summary(f"PCAP file: {pcap_stats['pcap_file']}") logger.summary(f"PCAP size: {pcap_stats['file_size_bytes']} bytes") # Final summary logger.summary(f"\nFinished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.summary(f"Logs saved to: {run_dir}") logger.close() print(f"\nResults saved to: {run_dir}") def main(): parser = argparse.ArgumentParser( description="NL-43 TCP Wedge Stress Test Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Test Phases: 1 Baseline Connection Count - DOD? at 1s until wedge or max 2 Rate Variation - DOD? at 1s, 2s, 5s, 10s, 30s spacing 3 Command Variety - Different commands to see if type matters 4 Connection Duration - Hold connections open various durations 5 Sustained Soak - Simulate real SLMM polling for hours Examples: %(prog)s --host 192.168.1.100 --phase 1 --max-commands 500 %(prog)s --host 192.168.1.100 --phase 5 --soak-duration 240 %(prog)s --host 192.168.1.100 --phase all --dry-run %(prog)s --host 10.0.0.50 --phase 1,2 --timeout 10 """ ) parser.add_argument("--host", required=True, help="NL-43 IP address") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"TCP port (default: {DEFAULT_PORT})") parser.add_argument("--phase", default="all", help="Phase(s) to run: 1,2,3,4,5 or 'all' (default: all)") parser.add_argument("--max-commands", type=int, default=2000, help="Phase 1: max commands before stopping (default: 2000)") parser.add_argument("--rate-rounds", type=int, default=50, help="Phase 2: commands per rate round (default: 50)") parser.add_argument("--soak-duration", type=int, default=120, help="Phase 5: soak duration in minutes (default: 120)") parser.add_argument("--log-dir", default="./stress_test_logs", help="Log output directory (default: ./stress_test_logs)") parser.add_argument("--pause", type=int, default=60, help="Recovery pause between phases in seconds (default: 60)") parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help=f"TCP timeout in seconds (default: {DEFAULT_TIMEOUT})") parser.add_argument("--dry-run", action="store_true", help="Show test plan without connecting") parser.add_argument("--no-pcap", action="store_true", help="Disable built-in tcpdump packet capture") args = parser.parse_args() if args.dry_run: dry_run(args) return print(f"\nNL-43 TCP Stress Test Tool") print(f"Target: {args.host}:{args.port}") print(f"Phase(s): {args.phase}") print(f"Press Ctrl+C at any time to stop safely\n") asyncio.run(run(args)) if __name__ == "__main__": main()