From a1a80bbb4d2a3c5e2a614966b5215d71875cbd30 Mon Sep 17 00:00:00 2001 From: serversdwn Date: Mon, 16 Feb 2026 04:25:51 +0000 Subject: [PATCH 1/3] add: new persisent connection approach, env variables for tcp keepalive and persist, added connection pool class. --- app/main.py | 7 + app/routers.py | 28 +++ app/services.py | 451 ++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 417 insertions(+), 69 deletions(-) diff --git a/app/main.py b/app/main.py index 406d7fc..176de97 100644 --- a/app/main.py +++ b/app/main.py @@ -29,7 +29,11 @@ logger.info("Database tables initialized") @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifecycle - startup and shutdown events.""" + from app.services import _connection_pool + # Startup + logger.info("Starting TCP connection pool cleanup task...") + _connection_pool.start_cleanup() logger.info("Starting background poller...") await poller.start() logger.info("Background poller started") @@ -40,6 +44,9 @@ async def lifespan(app: FastAPI): logger.info("Stopping background poller...") await poller.stop() logger.info("Background poller stopped") + logger.info("Closing TCP connection pool...") + await _connection_pool.close_all() + logger.info("TCP connection pool closed") app = FastAPI( diff --git a/app/routers.py b/app/routers.py index 40ecaf8..89d8ce7 100644 --- a/app/routers.py +++ b/app/routers.py @@ -93,6 +93,34 @@ class PollingConfigPayload(BaseModel): poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device") +# ============================================================================ +# TCP CONNECTION POOL ENDPOINTS (must be before /{unit_id} routes) +# ============================================================================ + +@router.get("/_connections/status") +async def get_connection_pool_status(): + """Get status of the persistent TCP connection pool. + + Returns information about cached connections, keepalive settings, + and per-device connection age/idle times. + """ + from app.services import _connection_pool + return {"status": "ok", "pool": _connection_pool.get_stats()} + + +@router.post("/_connections/flush") +async def flush_connection_pool(): + """Close all cached TCP connections. + + Useful for debugging or forcing fresh connections to all devices. + """ + from app.services import _connection_pool + await _connection_pool.close_all() + # Restart cleanup task since close_all cancels it + _connection_pool.start_cleanup() + return {"status": "ok", "message": "All cached connections closed"} + + # ============================================================================ # GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes) # ============================================================================ diff --git a/app/services.py b/app/services.py index 7e4c554..d85cb4b 100644 --- a/app/services.py +++ b/app/services.py @@ -1,20 +1,22 @@ """ NL43 TCP connector and snapshot persistence. -Implements simple per-request TCP calls to avoid long-lived socket complexity. -Extend to pooled connections/DRD streaming later. +Implements persistent per-device TCP connections with OS-level keepalive +to reduce handshake overhead and survive cellular modem NAT timeouts. +Falls back to per-request connections on error with transparent retry. """ import asyncio import contextlib import logging +import socket import time import os import zipfile import tempfile -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Tuple from sqlalchemy.orm import Session from ftplib import FTP from pathlib import Path @@ -234,6 +236,293 @@ async def _get_device_lock(device_key: str) -> asyncio.Lock: return _device_locks[device_key] +# --------------------------------------------------------------------------- +# Persistent TCP connection pool with OS-level keepalive +# --------------------------------------------------------------------------- + +# Configuration via environment variables +TCP_PERSISTENT_ENABLED = os.getenv("TCP_PERSISTENT_ENABLED", "true").lower() == "true" +TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "120")) # Close idle connections after N seconds +TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "300")) # Force reconnect after N seconds +TCP_KEEPALIVE_IDLE = int(os.getenv("TCP_KEEPALIVE_IDLE", "15")) # Seconds idle before probes +TCP_KEEPALIVE_INTERVAL = int(os.getenv("TCP_KEEPALIVE_INTERVAL", "10")) # Seconds between probes +TCP_KEEPALIVE_COUNT = int(os.getenv("TCP_KEEPALIVE_COUNT", "3")) # Failed probes before dead + +logger.info( + f"TCP connection pool: persistent={TCP_PERSISTENT_ENABLED}, " + f"idle_ttl={TCP_IDLE_TTL}s, max_age={TCP_MAX_AGE}s, " + f"keepalive_idle={TCP_KEEPALIVE_IDLE}s, keepalive_interval={TCP_KEEPALIVE_INTERVAL}s, " + f"keepalive_count={TCP_KEEPALIVE_COUNT}" +) + + +@dataclass +class DeviceConnection: + """Tracks a cached TCP connection and its metadata.""" + reader: asyncio.StreamReader + writer: asyncio.StreamWriter + device_key: str + host: str + port: int + created_at: float = field(default_factory=time.time) + last_used_at: float = field(default_factory=time.time) + + +class ConnectionPool: + """Per-device persistent TCP connection cache with OS-level keepalive. + + Each NL-43 device supports only one TCP connection at a time. This pool + caches that single connection per device key and reuses it across commands, + avoiding repeated TCP handshakes over high-latency cellular links. + + Keepalive probes keep cellular NAT tables alive and detect dead connections + before the next command attempt. + """ + + def __init__( + self, + enable_persistent: bool = True, + idle_ttl: float = 120.0, + max_age: float = 300.0, + keepalive_idle: int = 15, + keepalive_interval: int = 10, + keepalive_count: int = 3, + ): + self._connections: Dict[str, DeviceConnection] = {} + self._lock = asyncio.Lock() + self._enable_persistent = enable_persistent + self._idle_ttl = idle_ttl + self._max_age = max_age + self._keepalive_idle = keepalive_idle + self._keepalive_interval = keepalive_interval + self._keepalive_count = keepalive_count + self._cleanup_task: Optional[asyncio.Task] = None + + # -- lifecycle ---------------------------------------------------------- + + def start_cleanup(self): + """Start background task that evicts stale connections.""" + if self._enable_persistent and self._cleanup_task is None: + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + logger.info("Connection pool cleanup task started") + + async def close_all(self): + """Close all cached connections (called at shutdown).""" + if self._cleanup_task is not None: + self._cleanup_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._cleanup_task + self._cleanup_task = None + + async with self._lock: + for key, conn in list(self._connections.items()): + await self._close_connection(conn, reason="shutdown") + self._connections.clear() + logger.info("Connection pool: all connections closed") + + # -- public API --------------------------------------------------------- + + async def acquire( + self, device_key: str, host: str, port: int, timeout: float + ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter, bool]: + """Get a connection for a device (cached or fresh). + + Returns: + (reader, writer, from_cache) — from_cache is True if reused. + """ + if self._enable_persistent: + async with self._lock: + conn = self._connections.pop(device_key, None) + + if conn is not None: + if self._is_alive(conn): + self._drain_buffer(conn.reader) + conn.last_used_at = time.time() + logger.debug(f"Pool hit for {device_key} (age={time.time() - conn.created_at:.0f}s)") + return conn.reader, conn.writer, True + else: + await self._close_connection(conn, reason="stale") + + # Open fresh connection + reader, writer = await self._open_connection(host, port, timeout) + logger.debug(f"New connection opened for {device_key}") + return reader, writer, False + + async def release(self, device_key: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int): + """Return a connection to the pool for reuse.""" + if not self._enable_persistent: + self._close_writer(writer) + return + + # Check transport is still healthy before caching + if writer.transport.is_closing() or reader.at_eof(): + self._close_writer(writer) + return + + conn = DeviceConnection( + reader=reader, + writer=writer, + device_key=device_key, + host=host, + port=port, + ) + + async with self._lock: + # Evict any existing connection for this device (shouldn't happen + # under normal locking, but be safe) + old = self._connections.pop(device_key, None) + if old is not None: + await self._close_connection(old, reason="replaced") + self._connections[device_key] = conn + + async def discard(self, device_key: str): + """Close and remove a connection from the pool (called on errors).""" + async with self._lock: + conn = self._connections.pop(device_key, None) + if conn is not None: + await self._close_connection(conn, reason="discarded") + logger.debug(f"Pool discard for {device_key}") + + def get_stats(self) -> dict: + """Return pool status for diagnostics.""" + now = time.time() + connections = {} + for key, conn in self._connections.items(): + connections[key] = { + "host": conn.host, + "port": conn.port, + "age_seconds": round(now - conn.created_at, 1), + "idle_seconds": round(now - conn.last_used_at, 1), + "alive": self._is_alive(conn), + } + return { + "enabled": self._enable_persistent, + "active_connections": len(self._connections), + "idle_ttl": self._idle_ttl, + "max_age": self._max_age, + "keepalive_idle": self._keepalive_idle, + "keepalive_interval": self._keepalive_interval, + "keepalive_count": self._keepalive_count, + "connections": connections, + } + + # -- internals ---------------------------------------------------------- + + async def _open_connection( + self, host: str, port: int, timeout: float + ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + """Open a new TCP connection with keepalive options set.""" + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=timeout + ) + except asyncio.TimeoutError: + raise ConnectionError(f"Failed to connect to device at {host}:{port}") + except Exception as e: + raise ConnectionError(f"Failed to connect to device: {e}") + + # Set TCP keepalive on the underlying socket + self._set_keepalive(writer) + return reader, writer + + def _set_keepalive(self, writer: asyncio.StreamWriter): + """Configure OS-level TCP keepalive on the connection socket.""" + try: + sock = writer.transport.get_extra_info("socket") + if sock is None: + logger.warning("Could not access underlying socket for keepalive") + return + + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # Linux-specific keepalive tuning + if hasattr(socket, "TCP_KEEPIDLE"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self._keepalive_idle) + if hasattr(socket, "TCP_KEEPINTVL"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, self._keepalive_interval) + if hasattr(socket, "TCP_KEEPCNT"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self._keepalive_count) + + logger.debug( + f"TCP keepalive set: idle={self._keepalive_idle}s, " + f"interval={self._keepalive_interval}s, count={self._keepalive_count}" + ) + except OSError as e: + logger.warning(f"Failed to set TCP keepalive: {e}") + + def _is_alive(self, conn: DeviceConnection) -> bool: + """Check whether a cached connection is still usable.""" + now = time.time() + + # Age / idle checks + if now - conn.last_used_at > self._idle_ttl: + logger.debug(f"Connection {conn.device_key} idle too long ({now - conn.last_used_at:.0f}s > {self._idle_ttl}s)") + return False + if now - conn.created_at > self._max_age: + logger.debug(f"Connection {conn.device_key} too old ({now - conn.created_at:.0f}s > {self._max_age}s)") + return False + + # Transport-level checks + transport = conn.writer.transport + if transport.is_closing(): + logger.debug(f"Connection {conn.device_key} transport is closing") + return False + if conn.reader.at_eof(): + logger.debug(f"Connection {conn.device_key} reader at EOF") + return False + + return True + + @staticmethod + def _drain_buffer(reader: asyncio.StreamReader): + """Drain any pending bytes (e.g. '$' prompt) from an idle connection.""" + buf = reader._buffer # noqa: SLF001 — internal but stable across CPython + if buf: + pending = bytes(buf) + buf.clear() + logger.debug(f"Drained {len(pending)} bytes from cached connection: {pending!r}") + + @staticmethod + def _close_writer(writer: asyncio.StreamWriter): + """Close a writer, suppressing errors.""" + try: + writer.close() + except Exception: + pass + + async def _close_connection(self, conn: DeviceConnection, reason: str = ""): + """Fully close a cached connection.""" + logger.debug(f"Closing connection {conn.device_key} ({reason})") + conn.writer.close() + with contextlib.suppress(Exception): + await conn.writer.wait_closed() + + async def _cleanup_loop(self): + """Periodically evict idle/expired connections.""" + try: + while True: + await asyncio.sleep(30) + async with self._lock: + for key in list(self._connections): + conn = self._connections[key] + if not self._is_alive(conn): + del self._connections[key] + await self._close_connection(conn, reason="cleanup") + except asyncio.CancelledError: + pass + + +# Module-level pool singleton +_connection_pool = ConnectionPool( + enable_persistent=TCP_PERSISTENT_ENABLED, + idle_ttl=TCP_IDLE_TTL, + max_age=TCP_MAX_AGE, + keepalive_idle=TCP_KEEPALIVE_IDLE, + keepalive_interval=TCP_KEEPALIVE_INTERVAL, + keepalive_count=TCP_KEEPALIVE_COUNT, +) + + class NL43Client: def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21): self.host = host @@ -275,72 +564,97 @@ class NL43Client: return await self._send_command_unlocked(cmd) async def _send_command_unlocked(self, cmd: str) -> str: - """Internal: send command without acquiring device lock (lock must be held by caller).""" + """Internal: send command without acquiring device lock (lock must be held by caller). + + Uses the connection pool to reuse cached TCP connections when possible. + If a cached connection fails, retries once with a fresh connection. + """ await self._enforce_rate_limit() logger.info(f"Sending command to {self.device_key}: {cmd.strip()}") try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(self.host, self.port), timeout=self.timeout + reader, writer, from_cache = await _connection_pool.acquire( + self.device_key, self.host, self.port, self.timeout ) - except asyncio.TimeoutError: - logger.error(f"Connection timeout to {self.device_key}") - raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}") - except Exception as e: - logger.error(f"Connection failed to {self.device_key}: {e}") - raise ConnectionError(f"Failed to connect to device: {str(e)}") + except ConnectionError: + logger.error(f"Connection failed to {self.device_key}") + raise try: - writer.write(cmd.encode("ascii")) - await writer.drain() - - # Read first line (result code) - first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) - result_code = first_line_data.decode(errors="ignore").strip() - - # Remove leading $ prompt if present - if result_code.startswith("$"): - result_code = result_code[1:].strip() - - logger.info(f"Result code from {self.device_key}: {result_code}") - - # Check result code - if result_code == "R+0000": - # Success - for query commands, read the second line with actual data - is_query = cmd.strip().endswith("?") - if is_query: - data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) - response = data_line.decode(errors="ignore").strip() - logger.debug(f"Data line from {self.device_key}: {response}") - return response - else: - # Setting command - return success code - return result_code - elif result_code == "R+0001": - raise ValueError("Command error - device did not recognize command") - elif result_code == "R+0002": - raise ValueError("Parameter error - invalid parameter value") - elif result_code == "R+0003": - raise ValueError("Spec/type error - command not supported by this device model") - elif result_code == "R+0004": - raise ValueError("Status error - device is in wrong state for this command") - else: - raise ValueError(f"Unknown result code: {result_code}") - - except asyncio.TimeoutError: - logger.error(f"Response timeout from {self.device_key}") - raise TimeoutError(f"Device did not respond within {self.timeout}s") - except Exception as e: - logger.error(f"Communication error with {self.device_key}: {e}") - raise - finally: - writer.close() - with contextlib.suppress(Exception): - await writer.wait_closed() - # Record completion time for rate limiting — NL43 requires ≥1s - # after response before next command, so measure from connection close + response = await self._execute_command(reader, writer, cmd) + # Success — return connection to pool for reuse + await _connection_pool.release(self.device_key, reader, writer, self.host, self.port) _last_command_time[self.device_key] = time.time() + return response + + except Exception as e: + # Discard the bad connection + await _connection_pool.discard(self.device_key) + ConnectionPool._close_writer(writer) + + if from_cache: + # Retry once with a fresh connection — the cached one may have gone stale + logger.warning(f"Cached connection failed for {self.device_key}, retrying fresh: {e}") + await self._enforce_rate_limit() + + try: + reader, writer, _ = await _connection_pool.acquire( + self.device_key, self.host, self.port, self.timeout + ) + except ConnectionError: + logger.error(f"Retry connection also failed to {self.device_key}") + raise + + try: + response = await self._execute_command(reader, writer, cmd) + await _connection_pool.release(self.device_key, reader, writer, self.host, self.port) + _last_command_time[self.device_key] = time.time() + return response + except Exception: + await _connection_pool.discard(self.device_key) + ConnectionPool._close_writer(writer) + raise + else: + raise + + async def _execute_command(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, cmd: str) -> str: + """Send a command over an existing connection and parse the NL43 response.""" + writer.write(cmd.encode("ascii")) + await writer.drain() + + # Read first line (result code) + first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) + result_code = first_line_data.decode(errors="ignore").strip() + + # Remove leading $ prompt if present + if result_code.startswith("$"): + result_code = result_code[1:].strip() + + logger.info(f"Result code from {self.device_key}: {result_code}") + + # Check result code + if result_code == "R+0000": + # Success — for query commands, read the second line with actual data + is_query = cmd.strip().endswith("?") + if is_query: + data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) + response = data_line.decode(errors="ignore").strip() + logger.debug(f"Data line from {self.device_key}: {response}") + return response + else: + # Setting command — return success code + return result_code + elif result_code == "R+0001": + raise ValueError("Command error - device did not recognize command") + elif result_code == "R+0002": + raise ValueError("Parameter error - invalid parameter value") + elif result_code == "R+0003": + raise ValueError("Spec/type error - command not supported by this device model") + elif result_code == "R+0004": + raise ValueError("Status error - device is in wrong state for this command") + else: + raise ValueError(f"Unknown result code: {result_code}") async def request_dod(self) -> NL43Snapshot: """Request DOD (Data Output Display) snapshot from device. @@ -582,20 +896,19 @@ class NL43Client: # Acquire per-device lock - held for entire streaming session device_lock = await _get_device_lock(self.device_key) async with device_lock: + # Evict any cached connection — streaming needs its own dedicated socket + await _connection_pool.discard(self.device_key) await self._enforce_rate_limit() logger.info(f"Starting DRD stream for {self.device_key}") try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(self.host, self.port), timeout=self.timeout + reader, writer = await _connection_pool._open_connection( + self.host, self.port, self.timeout ) - except asyncio.TimeoutError: - logger.error(f"DRD stream connection timeout to {self.device_key}") - raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}") - except Exception as e: - logger.error(f"DRD stream connection failed to {self.device_key}: {e}") - raise ConnectionError(f"Failed to connect to device: {str(e)}") + except ConnectionError: + logger.error(f"DRD stream connection failed to {self.device_key}") + raise try: # Start DRD streaming -- 2.49.1 From a5f8d1b2c77b630336c20a33063c15c6990fcdca Mon Sep 17 00:00:00 2001 From: serversdwn Date: Tue, 17 Feb 2026 02:41:09 +0000 Subject: [PATCH 2/3] Persistent polling interval increased. Healthcheck now uses poll instead of separate handshakes. --- app/main.py | 34 +++-- app/routers.py | 84 +++-------- app/services.py | 4 +- templates/roster.html | 329 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 352 insertions(+), 99 deletions(-) diff --git a/app/main.py b/app/main.py index 176de97..abf0879 100644 --- a/app/main.py +++ b/app/main.py @@ -92,10 +92,14 @@ async def health(): @app.get("/health/devices") async def health_devices(): - """Enhanced health check that tests device connectivity.""" + """Enhanced health check that tests device connectivity. + + Uses the connection pool to avoid unnecessary TCP handshakes — if a + cached connection exists and is alive, the device is reachable. + """ from sqlalchemy.orm import Session from app.database import SessionLocal - from app.services import NL43Client + from app.services import _connection_pool from app.models import NL43Config db: Session = SessionLocal() @@ -105,7 +109,7 @@ async def health_devices(): configs = db.query(NL43Config).filter_by(tcp_enabled=True).all() for cfg in configs: - client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password) + device_key = f"{cfg.host}:{cfg.tcp_port}" status = { "unit_id": cfg.unit_id, "host": cfg.host, @@ -115,14 +119,22 @@ async def health_devices(): } try: - # Try to connect (don't send command to avoid rate limiting issues) - import asyncio - reader, writer = await asyncio.wait_for( - asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=2.0 - ) - writer.close() - await writer.wait_closed() - status["reachable"] = True + # Check if pool already has a live connection (zero-cost check) + pool_stats = _connection_pool.get_stats() + conn_info = pool_stats["connections"].get(device_key) + if conn_info and conn_info["alive"]: + status["reachable"] = True + status["source"] = "pool" + else: + # No cached connection — do a lightweight acquire/release + # This opens a connection if needed but keeps it in the pool + import asyncio + reader, writer, from_cache = await _connection_pool.acquire( + device_key, cfg.host, cfg.tcp_port, timeout=2.0 + ) + await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port) + status["reachable"] = True + status["source"] = "cached" if from_cache else "new" except Exception as e: status["error"] = str(type(e).__name__) logger.warning(f"Device {cfg.unit_id} health check failed: {e}") diff --git a/app/routers.py b/app/routers.py index 89d8ce7..a21c928 100644 --- a/app/routers.py +++ b/app/routers.py @@ -1755,74 +1755,38 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)): "message": "TCP communication enabled" } - # Test 3: Modem/Router reachable (check port 443 HTTPS) + # Test 3: TCP connection reachable (device port) — uses connection pool + # This avoids extra TCP handshakes over cellular. If a cached connection + # exists and is alive, we skip the handshake entirely. + from app.services import _connection_pool + device_key = f"{cfg.host}:{cfg.tcp_port}" try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(cfg.host, 443), timeout=3.0 - ) - writer.close() - await writer.wait_closed() - diagnostics["tests"]["modem_reachable"] = { - "status": "pass", - "message": f"Modem/router reachable at {cfg.host}" - } - except asyncio.TimeoutError: - diagnostics["tests"]["modem_reachable"] = { - "status": "fail", - "message": f"Modem/router timeout at {cfg.host} (network issue)" - } - diagnostics["overall_status"] = "fail" - return diagnostics - except ConnectionRefusedError: - # Connection refused means host is up but port 443 closed - that's ok - diagnostics["tests"]["modem_reachable"] = { - "status": "pass", - "message": f"Modem/router reachable at {cfg.host} (HTTPS closed)" - } - except Exception as e: - diagnostics["tests"]["modem_reachable"] = { - "status": "fail", - "message": f"Cannot reach modem/router at {cfg.host}: {str(e)}" - } - diagnostics["overall_status"] = "fail" - return diagnostics - - # Test 4: TCP connection reachable (device port) - try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=3.0 - ) - writer.close() - await writer.wait_closed() - diagnostics["tests"]["tcp_connection"] = { - "status": "pass", - "message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}" - } - except asyncio.TimeoutError: - diagnostics["tests"]["tcp_connection"] = { - "status": "fail", - "message": f"Connection timeout to {cfg.host}:{cfg.tcp_port}" - } - diagnostics["overall_status"] = "fail" - return diagnostics - except ConnectionRefusedError: - diagnostics["tests"]["tcp_connection"] = { - "status": "fail", - "message": f"Connection refused by {cfg.host}:{cfg.tcp_port}" - } - diagnostics["overall_status"] = "fail" - return diagnostics + pool_stats = _connection_pool.get_stats() + conn_info = pool_stats["connections"].get(device_key) + if conn_info and conn_info["alive"]: + # Pool already has a live connection — device is reachable + diagnostics["tests"]["tcp_connection"] = { + "status": "pass", + "message": f"TCP connection alive in pool for {cfg.host}:{cfg.tcp_port}" + } + else: + # Acquire through the pool (opens new if needed, keeps it cached) + reader, writer, from_cache = await _connection_pool.acquire( + device_key, cfg.host, cfg.tcp_port, timeout=3.0 + ) + await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port) + diagnostics["tests"]["tcp_connection"] = { + "status": "pass", + "message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}" + } except Exception as e: diagnostics["tests"]["tcp_connection"] = { "status": "fail", - "message": f"Connection error: {str(e)}" + "message": f"Connection error to {cfg.host}:{cfg.tcp_port}: {str(e)}" } diagnostics["overall_status"] = "fail" return diagnostics - # Wait a bit after connection test to let device settle - await asyncio.sleep(1.5) - # Test 5: Device responds to commands # Use longer timeout to account for rate limiting (device requires ≥1s between commands) client = NL43Client(cfg.host, cfg.tcp_port, timeout=10.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password) diff --git a/app/services.py b/app/services.py index d85cb4b..860bffa 100644 --- a/app/services.py +++ b/app/services.py @@ -242,8 +242,8 @@ async def _get_device_lock(device_key: str) -> asyncio.Lock: # Configuration via environment variables TCP_PERSISTENT_ENABLED = os.getenv("TCP_PERSISTENT_ENABLED", "true").lower() == "true" -TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "120")) # Close idle connections after N seconds -TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "300")) # Force reconnect after N seconds +TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "300")) # Close idle connections after N seconds +TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "1800")) # Force reconnect after N seconds TCP_KEEPALIVE_IDLE = int(os.getenv("TCP_KEEPALIVE_IDLE", "15")) # Seconds idle before probes TCP_KEEPALIVE_INTERVAL = int(os.getenv("TCP_KEEPALIVE_INTERVAL", "10")) # Seconds between probes TCP_KEEPALIVE_COUNT = int(os.getenv("TCP_KEEPALIVE_COUNT", "3")) # Failed probes before dead diff --git a/templates/roster.html b/templates/roster.html index 6c8d23d..d22d086 100644 --- a/templates/roster.html +++ b/templates/roster.html @@ -3,7 +3,7 @@ - SLMM Roster - Sound Level Meter Configuration + SLMM - Device Roster & Connections
-

📊 Sound Level Meter Roster

+

SLMM - Roster & Connections

-
- - - - - - - - - - - - - - - - - - - -
Unit IDHost / IPTCP PortFTP PortTCPFTPPollingStatusActions
- Loading... -
+
+ + +
+ + +
+
+ + + + + + + + + + + + + + + + + + + +
Unit IDHost / IPTCP PortFTP PortTCPFTPPollingStatusActions
+ Loading... +
+
+
+ + +
+
+
+ + +
+ +

Pool Configuration

+
+
+
Status
+
--
+
+
+ +

Active Connections

+
+
Loading...
+
+
@@ -619,6 +743,159 @@ closeModal(); } }); + + // ========== Tab Switching ========== + + function switchTab(tabName) { + document.querySelectorAll('.tab-btn').forEach(btn => btn.classList.remove('active')); + document.querySelectorAll('.tab-panel').forEach(panel => panel.classList.remove('active')); + + document.querySelector(`.tab-btn[onclick="switchTab('${tabName}')"]`).classList.add('active'); + document.getElementById(`tab-${tabName}`).classList.add('active'); + + if (tabName === 'connections') { + loadConnections(); + } + } + + // ========== Connection Pool ========== + + let connectionsRefreshTimer = null; + + async function loadConnections() { + try { + const res = await fetch('/api/nl43/_connections/status'); + const data = await res.json(); + + if (!res.ok) { + showToast('Failed to load connection pool status', 'error'); + return; + } + + const pool = data.pool; + renderPoolConfig(pool); + renderConnections(pool.connections); + + // Auto-refresh while tab is active + clearTimeout(connectionsRefreshTimer); + if (document.getElementById('tab-connections').classList.contains('active')) { + connectionsRefreshTimer = setTimeout(loadConnections, 5000); + } + } catch (err) { + showToast('Error loading connections: ' + err.message, 'error'); + console.error('Load connections error:', err); + } + } + + function renderPoolConfig(pool) { + document.getElementById('poolConfig').innerHTML = ` +
+
Persistent
+
${pool.enabled ? 'Enabled' : 'Disabled'}
+
+
+
Active
+
${pool.active_connections}
+
+
+
Idle TTL
+
${pool.idle_ttl}s
+
+
+
Max Age
+
${pool.max_age}s
+
+
+
KA Idle
+
${pool.keepalive_idle}s
+
+
+
KA Interval
+
${pool.keepalive_interval}s
+
+
+
KA Probes
+
${pool.keepalive_count}
+
+ `; + } + + function renderConnections(connections) { + const container = document.getElementById('connectionsList'); + const keys = Object.keys(connections); + + if (keys.length === 0) { + container.innerHTML = ` +
+
~
+
No active connections
+
+ Connections appear here when devices are actively being polled and the connection is cached between commands. +
+
+ `; + return; + } + + container.innerHTML = keys.map(key => { + const conn = connections[key]; + const aliveColor = conn.alive ? '#1a7f37' : '#cf222e'; + const aliveText = conn.alive ? 'Alive' : 'Stale'; + return ` +
+
+ ${escapeHtml(key)} + ${aliveText} +
+
+
+
Host
+
${escapeHtml(conn.host)}
+
+
+
Port
+
${conn.port}
+
+
+
Age
+
${formatSeconds(conn.age_seconds)}
+
+
+
Idle
+
${formatSeconds(conn.idle_seconds)}
+
+
+
+ `; + }).join(''); + } + + function formatSeconds(s) { + if (s < 60) return Math.round(s) + 's'; + if (s < 3600) return Math.floor(s / 60) + 'm ' + Math.round(s % 60) + 's'; + return Math.floor(s / 3600) + 'h ' + Math.floor((s % 3600) / 60) + 'm'; + } + + async function flushConnections() { + if (!confirm('Close all cached TCP connections?\n\nDevices will reconnect on the next poll cycle.')) { + return; + } + + try { + const res = await fetch('/api/nl43/_connections/flush', { method: 'POST' }); + const data = await res.json(); + + if (!res.ok) { + showToast(data.detail || 'Failed to flush connections', 'error'); + return; + } + + showToast('All connections flushed', 'success'); + await loadConnections(); + } catch (err) { + showToast('Error flushing connections: ' + err.message, 'error'); + } + } -- 2.49.1 From b62e84f8b32b5db27fd6a4194295d3cecfa9708d Mon Sep 17 00:00:00 2001 From: serversdwn Date: Tue, 17 Feb 2026 02:56:11 +0000 Subject: [PATCH 3/3] v0.3.0, persistent polling update. --- CHANGELOG.md | 54 ++++++++++++++++++++++++++++++++++++ README.md | 77 ++++++++++++++++++++++++++++++++++++++++++---------- app/main.py | 2 +- 3 files changed, 118 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b265e55..d83bab1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,59 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.0] - 2026-02-17 + +### Added + +#### Persistent TCP Connection Pool +- **Connection reuse** - TCP connections are cached per device and reused across commands, eliminating repeated TCP handshakes over cellular modems +- **OS-level TCP keepalive** - Configurable keepalive probes keep cellular NAT tables alive and detect dead connections early (default: probe after 15s idle, every 10s, 3 failures = dead) +- **Transparent retry** - If a cached connection goes stale, the system automatically retries with a fresh connection so failures are never visible to the caller +- **Stale connection detection** - Multi-layer detection via idle TTL, max age, transport state, and reader EOF checks +- **Background cleanup** - Periodic task (every 30s) evicts expired connections from the pool +- **Master switch** - Set `TCP_PERSISTENT_ENABLED=false` to revert to per-request connection behavior + +#### Connection Pool Diagnostics +- `GET /api/nl43/_connections/status` - View pool configuration, active connections, age/idle times, and keepalive settings +- `POST /api/nl43/_connections/flush` - Force-close all cached connections (useful for debugging) +- **Connections tab on roster page** - Live UI showing pool config, active connections with age/idle/alive status, auto-refreshes every 5s, and flush button + +#### Environment Variables +- `TCP_PERSISTENT_ENABLED` (default: `true`) - Master switch for persistent connections +- `TCP_IDLE_TTL` (default: `300`) - Close idle connections after N seconds +- `TCP_MAX_AGE` (default: `1800`) - Force reconnect after N seconds +- `TCP_KEEPALIVE_IDLE` (default: `15`) - Seconds idle before keepalive probes start +- `TCP_KEEPALIVE_INTERVAL` (default: `10`) - Seconds between keepalive probes +- `TCP_KEEPALIVE_COUNT` (default: `3`) - Failed probes before declaring connection dead + +### Changed +- **Health check endpoint** (`/health/devices`) - Now uses connection pool instead of opening throwaway TCP connections; checks for existing live connections first (zero-cost), only opens new connection through pool if needed +- **Diagnostics endpoint** - Removed separate port 443 modem check (extra handshake waste); TCP reachability test now uses connection pool +- **DRD streaming** - Streaming connections now get TCP keepalive options set; cached connections are evicted before opening dedicated streaming socket +- **Default timeouts tuned for cellular** - Idle TTL raised to 300s (5 min), max age raised to 1800s (30 min) to survive typical polling intervals over cellular links + +### Technical Details + +#### Architecture +- `ConnectionPool` class in `services.py` manages a single cached connection per device key (NL-43 only supports one TCP connection at a time) +- Uses existing per-device asyncio locks and rate limiting — no changes to concurrency model +- Pool is a module-level singleton initialized from environment variables at import time +- Lifecycle managed via FastAPI lifespan: cleanup task starts on startup, all connections closed on shutdown +- `_send_command_unlocked()` refactored to use acquire/release/discard pattern with single-retry fallback +- Command parsing extracted to `_execute_command()` method for reuse between primary and retry paths + +#### Cellular Modem Optimizations +- Keepalive probes at 15s prevent cellular NAT tables from expiring (typically 30-60s timeout) +- 300s idle TTL ensures connections survive between polling cycles (default 60s interval) +- 1800s max age allows a single socket to serve ~30 minutes of polling before forced reconnect +- Health checks and diagnostics produce zero additional TCP handshakes when a pooled connection exists +- Stale `$` prompt bytes drained from idle connections before command reuse + +### Breaking Changes +None. This release is fully backward-compatible with v0.2.x. Set `TCP_PERSISTENT_ENABLED=false` for identical behavior to previous versions. + +--- + ## [0.2.1] - 2026-01-23 ### Added @@ -146,6 +199,7 @@ None. This release is fully backward-compatible with v0.1.x. All existing endpoi ## Version History Summary +- **v0.3.0** (2026-02-17) - Persistent TCP connections with keepalive for cellular modem reliability - **v0.2.1** (2026-01-23) - Roster management, scheduler hooks, FTP logging, doc cleanup - **v0.2.0** (2026-01-15) - Background Polling System - **v0.1.0** (2025-12-XX) - Initial Release diff --git a/README.md b/README.md index 1347e35..441a1e6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SLMM - Sound Level Meter Manager -**Version 0.2.1** +**Version 0.3.0** Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols. @@ -12,8 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t ## Features -- **Background Polling** ⭐ NEW: Continuous automatic polling of devices with configurable intervals -- **Offline Detection** ⭐ NEW: Automatic device reachability tracking with failure counters +- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability +- **Background Polling**: Continuous automatic polling of devices with configurable intervals +- **Offline Detection**: Automatic device reachability tracking with failure counters - **Device Management**: Configure and manage multiple NL43/NL53 devices - **Real-time Monitoring**: Stream live measurement data via WebSocket - **Measurement Control**: Start, stop, pause, resume, and reset measurements @@ -22,6 +23,7 @@ SLMM is a standalone backend module that provides REST API routing and command t - **Device Configuration**: Manage frequency/time weighting, clock sync, and more - **Rate Limiting**: Automatic 1-second delay enforcement between device commands - **Persistent Storage**: SQLite database for device configs and measurement cache +- **Connection Diagnostics**: Live UI and API endpoints for monitoring TCP connection pool status ## Architecture @@ -29,29 +31,39 @@ SLMM is a standalone backend module that provides REST API routing and command t ┌─────────────────┐ ┌──────────────────────────────┐ ┌─────────────────┐ │ │◄───────►│ SLMM API │◄───────►│ NL43/NL53 │ │ (Frontend) │ HTTP │ • REST Endpoints │ TCP │ Sound Meters │ -└─────────────────┘ │ • WebSocket Streaming │ └─────────────────┘ - │ • Background Poller ⭐ NEW │ ▲ - └──────────────────────────────┘ │ - │ Continuous - ▼ Polling - ┌──────────────┐ │ - │ SQLite DB │◄─────────────────────┘ +└─────────────────┘ │ • WebSocket Streaming │ (kept │ (via cellular │ + │ • Background Poller │ alive) │ modem) │ + │ • Connection Pool (v0.3) │ └─────────────────┘ + └──────────────────────────────┘ + │ + ▼ + ┌──────────────┐ + │ SQLite DB │ │ • Config │ │ • Status │ └──────────────┘ ``` +### Persistent TCP Connection Pool (v0.3.0) + +SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems: + +- **Connection Reuse**: One cached TCP socket per device, reused across all commands (no repeated handshakes) +- **TCP Keepalive**: Probes keep cellular NAT tables alive and detect dead connections early +- **Transparent Retry**: Stale cached connections automatically retry with a fresh socket +- **Configurable**: Idle TTL (300s), max age (1800s), and keepalive timing via environment variables +- **Diagnostics**: Live UI on the roster page and API endpoints for monitoring pool status + ### Background Polling (v0.2.0) -SLMM now includes a background polling service that continuously queries devices and updates the status cache: +Background polling service continuously queries devices and updates the status cache: - **Automatic Updates**: Devices are polled at configurable intervals (10-3600 seconds) - **Offline Detection**: Devices marked unreachable after 3 consecutive failures - **Per-Device Configuration**: Each device can have a custom polling interval - **Resource Efficient**: Dynamic sleep intervals and smart scheduling -- **Graceful Shutdown**: Background task stops cleanly on service shutdown -This makes Terra-View significantly more responsive - status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds). +Status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds). ## Quick Start @@ -96,9 +108,18 @@ Once running, visit: ### Environment Variables +**Server:** - `PORT`: Server port (default: 8100) - `CORS_ORIGINS`: Comma-separated list of allowed origins (default: "*") +**TCP Connection Pool:** +- `TCP_PERSISTENT_ENABLED`: Enable persistent connections (default: "true") +- `TCP_IDLE_TTL`: Close idle connections after N seconds (default: 300) +- `TCP_MAX_AGE`: Force reconnect after N seconds (default: 1800) +- `TCP_KEEPALIVE_IDLE`: Seconds idle before keepalive probes (default: 15) +- `TCP_KEEPALIVE_INTERVAL`: Seconds between keepalive probes (default: 10) +- `TCP_KEEPALIVE_COUNT`: Failed probes before declaring dead (default: 3) + ### Database The SQLite database is automatically created at [data/slmm.db](data/slmm.db) on first run. @@ -126,7 +147,7 @@ Logs are written to: | GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) | | WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data | -### Background Polling Configuration ⭐ NEW +### Background Polling | Method | Endpoint | Description | |--------|----------|-------------| @@ -134,6 +155,13 @@ Logs are written to: | PUT | `/api/nl43/{unit_id}/polling/config` | Update polling interval and enable/disable polling | | GET | `/api/nl43/_polling/status` | Get global polling status for all devices | +### Connection Pool + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/api/nl43/_connections/status` | Get pool config, active connections, age/idle times | +| POST | `/api/nl43/_connections/flush` | Force-close all cached TCP connections | + ### Measurement Control | Method | Endpoint | Description | @@ -255,6 +283,9 @@ Caches latest measurement snapshot: ### TCP Communication - Uses ASCII command protocol over TCP +- Persistent connections with OS-level keepalive (tuned for cellular modems) +- Connections cached per device and reused across commands +- Transparent retry on stale connections - Enforces ≥1 second delay between commands to same device - Two-line response format: - Line 1: Result code (R+0000 for success) @@ -320,6 +351,16 @@ curl http://localhost:8100/api/nl43/meter-001/polling/config curl http://localhost:8100/api/nl43/_polling/status ``` +### Check Connection Pool Status +```bash +curl http://localhost:8100/api/nl43/_connections/status | jq '.' +``` + +### Flush All Cached Connections +```bash +curl -X POST http://localhost:8100/api/nl43/_connections/flush +``` + ### Verify Device Settings ```bash curl http://localhost:8100/api/nl43/meter-001/settings @@ -388,11 +429,19 @@ See [API.md](API.md) for detailed integration examples. ## Troubleshooting ### Connection Issues +- Check connection pool status: `curl http://localhost:8100/api/nl43/_connections/status` +- Flush stale connections: `curl -X POST http://localhost:8100/api/nl43/_connections/flush` - Verify device IP address and port in configuration - Ensure device is on the same network - Check firewall rules allow TCP/FTP connections - Verify RX55 network adapter is properly configured on device +### Cellular Modem Issues +- If modem wedges from too many handshakes, ensure `TCP_PERSISTENT_ENABLED=true` (default) +- Increase `TCP_IDLE_TTL` if connections expire between poll cycles +- Keepalive probes (default: every 15s) keep NAT tables alive — adjust `TCP_KEEPALIVE_IDLE` if needed +- Set `TCP_PERSISTENT_ENABLED=false` to disable pooling for debugging + ### Rate Limiting - API automatically enforces 1-second delay between commands - If experiencing delays, this is normal device behavior diff --git a/app/main.py b/app/main.py index abf0879..ffce6a2 100644 --- a/app/main.py +++ b/app/main.py @@ -52,7 +52,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="SLMM NL43 Addon", description="Standalone module for NL43 configuration and status APIs with background polling", - version="0.2.0", + version="0.3.0", lifespan=lifespan, ) -- 2.49.1