diff --git a/CHANGELOG.md b/CHANGELOG.md
index b265e55..d83bab1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,59 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [0.3.0] - 2026-02-17
+
+### Added
+
+#### Persistent TCP Connection Pool
+- **Connection reuse** - TCP connections are cached per device and reused across commands, eliminating repeated TCP handshakes over cellular modems
+- **OS-level TCP keepalive** - Configurable keepalive probes keep cellular NAT tables alive and detect dead connections early (default: probe after 15s idle, every 10s, 3 failures = dead)
+- **Transparent retry** - If a cached connection goes stale, the system automatically retries with a fresh connection so failures are never visible to the caller
+- **Stale connection detection** - Multi-layer detection via idle TTL, max age, transport state, and reader EOF checks
+- **Background cleanup** - Periodic task (every 30s) evicts expired connections from the pool
+- **Master switch** - Set `TCP_PERSISTENT_ENABLED=false` to revert to per-request connection behavior
+
+#### Connection Pool Diagnostics
+- `GET /api/nl43/_connections/status` - View pool configuration, active connections, age/idle times, and keepalive settings
+- `POST /api/nl43/_connections/flush` - Force-close all cached connections (useful for debugging)
+- **Connections tab on roster page** - Live UI showing pool config, active connections with age/idle/alive status, auto-refreshes every 5s, and flush button
+
+#### Environment Variables
+- `TCP_PERSISTENT_ENABLED` (default: `true`) - Master switch for persistent connections
+- `TCP_IDLE_TTL` (default: `300`) - Close idle connections after N seconds
+- `TCP_MAX_AGE` (default: `1800`) - Force reconnect after N seconds
+- `TCP_KEEPALIVE_IDLE` (default: `15`) - Seconds idle before keepalive probes start
+- `TCP_KEEPALIVE_INTERVAL` (default: `10`) - Seconds between keepalive probes
+- `TCP_KEEPALIVE_COUNT` (default: `3`) - Failed probes before declaring connection dead
+
+### Changed
+- **Health check endpoint** (`/health/devices`) - Now uses connection pool instead of opening throwaway TCP connections; checks for existing live connections first (zero-cost), only opens new connection through pool if needed
+- **Diagnostics endpoint** - Removed separate port 443 modem check (extra handshake waste); TCP reachability test now uses connection pool
+- **DRD streaming** - Streaming connections now get TCP keepalive options set; cached connections are evicted before opening dedicated streaming socket
+- **Default timeouts tuned for cellular** - Idle TTL raised to 300s (5 min), max age raised to 1800s (30 min) to survive typical polling intervals over cellular links
+
+### Technical Details
+
+#### Architecture
+- `ConnectionPool` class in `services.py` manages a single cached connection per device key (NL-43 only supports one TCP connection at a time)
+- Uses existing per-device asyncio locks and rate limiting — no changes to concurrency model
+- Pool is a module-level singleton initialized from environment variables at import time
+- Lifecycle managed via FastAPI lifespan: cleanup task starts on startup, all connections closed on shutdown
+- `_send_command_unlocked()` refactored to use acquire/release/discard pattern with single-retry fallback
+- Command parsing extracted to `_execute_command()` method for reuse between primary and retry paths
+
+#### Cellular Modem Optimizations
+- Keepalive probes at 15s prevent cellular NAT tables from expiring (typically 30-60s timeout)
+- 300s idle TTL ensures connections survive between polling cycles (default 60s interval)
+- 1800s max age allows a single socket to serve ~30 minutes of polling before forced reconnect
+- Health checks and diagnostics produce zero additional TCP handshakes when a pooled connection exists
+- Stale `$` prompt bytes drained from idle connections before command reuse
+
+### Breaking Changes
+None. This release is fully backward-compatible with v0.2.x. Set `TCP_PERSISTENT_ENABLED=false` for identical behavior to previous versions.
+
+---
+
## [0.2.1] - 2026-01-23
### Added
@@ -146,6 +199,7 @@ None. This release is fully backward-compatible with v0.1.x. All existing endpoi
## Version History Summary
+- **v0.3.0** (2026-02-17) - Persistent TCP connections with keepalive for cellular modem reliability
- **v0.2.1** (2026-01-23) - Roster management, scheduler hooks, FTP logging, doc cleanup
- **v0.2.0** (2026-01-15) - Background Polling System
- **v0.1.0** (2025-12-XX) - Initial Release
diff --git a/README.md b/README.md
index 1347e35..441a1e6 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# SLMM - Sound Level Meter Manager
-**Version 0.2.1**
+**Version 0.3.0**
Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols.
@@ -12,8 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t
## Features
-- **Background Polling** ⭐ NEW: Continuous automatic polling of devices with configurable intervals
-- **Offline Detection** ⭐ NEW: Automatic device reachability tracking with failure counters
+- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability
+- **Background Polling**: Continuous automatic polling of devices with configurable intervals
+- **Offline Detection**: Automatic device reachability tracking with failure counters
- **Device Management**: Configure and manage multiple NL43/NL53 devices
- **Real-time Monitoring**: Stream live measurement data via WebSocket
- **Measurement Control**: Start, stop, pause, resume, and reset measurements
@@ -22,6 +23,7 @@ SLMM is a standalone backend module that provides REST API routing and command t
- **Device Configuration**: Manage frequency/time weighting, clock sync, and more
- **Rate Limiting**: Automatic 1-second delay enforcement between device commands
- **Persistent Storage**: SQLite database for device configs and measurement cache
+- **Connection Diagnostics**: Live UI and API endpoints for monitoring TCP connection pool status
## Architecture
@@ -29,29 +31,39 @@ SLMM is a standalone backend module that provides REST API routing and command t
┌─────────────────┐ ┌──────────────────────────────┐ ┌─────────────────┐
│ │◄───────►│ SLMM API │◄───────►│ NL43/NL53 │
│ (Frontend) │ HTTP │ • REST Endpoints │ TCP │ Sound Meters │
-└─────────────────┘ │ • WebSocket Streaming │ └─────────────────┘
- │ • Background Poller ⭐ NEW │ ▲
- └──────────────────────────────┘ │
- │ Continuous
- ▼ Polling
- ┌──────────────┐ │
- │ SQLite DB │◄─────────────────────┘
+└─────────────────┘ │ • WebSocket Streaming │ (kept │ (via cellular │
+ │ • Background Poller │ alive) │ modem) │
+ │ • Connection Pool (v0.3) │ └─────────────────┘
+ └──────────────────────────────┘
+ │
+ ▼
+ ┌──────────────┐
+ │ SQLite DB │
│ • Config │
│ • Status │
└──────────────┘
```
+### Persistent TCP Connection Pool (v0.3.0)
+
+SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems:
+
+- **Connection Reuse**: One cached TCP socket per device, reused across all commands (no repeated handshakes)
+- **TCP Keepalive**: Probes keep cellular NAT tables alive and detect dead connections early
+- **Transparent Retry**: Stale cached connections automatically retry with a fresh socket
+- **Configurable**: Idle TTL (300s), max age (1800s), and keepalive timing via environment variables
+- **Diagnostics**: Live UI on the roster page and API endpoints for monitoring pool status
+
### Background Polling (v0.2.0)
-SLMM now includes a background polling service that continuously queries devices and updates the status cache:
+Background polling service continuously queries devices and updates the status cache:
- **Automatic Updates**: Devices are polled at configurable intervals (10-3600 seconds)
- **Offline Detection**: Devices marked unreachable after 3 consecutive failures
- **Per-Device Configuration**: Each device can have a custom polling interval
- **Resource Efficient**: Dynamic sleep intervals and smart scheduling
-- **Graceful Shutdown**: Background task stops cleanly on service shutdown
-This makes Terra-View significantly more responsive - status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds).
+Status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds).
## Quick Start
@@ -96,9 +108,18 @@ Once running, visit:
### Environment Variables
+**Server:**
- `PORT`: Server port (default: 8100)
- `CORS_ORIGINS`: Comma-separated list of allowed origins (default: "*")
+**TCP Connection Pool:**
+- `TCP_PERSISTENT_ENABLED`: Enable persistent connections (default: "true")
+- `TCP_IDLE_TTL`: Close idle connections after N seconds (default: 300)
+- `TCP_MAX_AGE`: Force reconnect after N seconds (default: 1800)
+- `TCP_KEEPALIVE_IDLE`: Seconds idle before keepalive probes (default: 15)
+- `TCP_KEEPALIVE_INTERVAL`: Seconds between keepalive probes (default: 10)
+- `TCP_KEEPALIVE_COUNT`: Failed probes before declaring dead (default: 3)
+
### Database
The SQLite database is automatically created at [data/slmm.db](data/slmm.db) on first run.
@@ -126,7 +147,7 @@ Logs are written to:
| GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) |
| WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data |
-### Background Polling Configuration ⭐ NEW
+### Background Polling
| Method | Endpoint | Description |
|--------|----------|-------------|
@@ -134,6 +155,13 @@ Logs are written to:
| PUT | `/api/nl43/{unit_id}/polling/config` | Update polling interval and enable/disable polling |
| GET | `/api/nl43/_polling/status` | Get global polling status for all devices |
+### Connection Pool
+
+| Method | Endpoint | Description |
+|--------|----------|-------------|
+| GET | `/api/nl43/_connections/status` | Get pool config, active connections, age/idle times |
+| POST | `/api/nl43/_connections/flush` | Force-close all cached TCP connections |
+
### Measurement Control
| Method | Endpoint | Description |
@@ -255,6 +283,9 @@ Caches latest measurement snapshot:
### TCP Communication
- Uses ASCII command protocol over TCP
+- Persistent connections with OS-level keepalive (tuned for cellular modems)
+- Connections cached per device and reused across commands
+- Transparent retry on stale connections
- Enforces ≥1 second delay between commands to same device
- Two-line response format:
- Line 1: Result code (R+0000 for success)
@@ -320,6 +351,16 @@ curl http://localhost:8100/api/nl43/meter-001/polling/config
curl http://localhost:8100/api/nl43/_polling/status
```
+### Check Connection Pool Status
+```bash
+curl http://localhost:8100/api/nl43/_connections/status | jq '.'
+```
+
+### Flush All Cached Connections
+```bash
+curl -X POST http://localhost:8100/api/nl43/_connections/flush
+```
+
### Verify Device Settings
```bash
curl http://localhost:8100/api/nl43/meter-001/settings
@@ -388,11 +429,19 @@ See [API.md](API.md) for detailed integration examples.
## Troubleshooting
### Connection Issues
+- Check connection pool status: `curl http://localhost:8100/api/nl43/_connections/status`
+- Flush stale connections: `curl -X POST http://localhost:8100/api/nl43/_connections/flush`
- Verify device IP address and port in configuration
- Ensure device is on the same network
- Check firewall rules allow TCP/FTP connections
- Verify RX55 network adapter is properly configured on device
+### Cellular Modem Issues
+- If modem wedges from too many handshakes, ensure `TCP_PERSISTENT_ENABLED=true` (default)
+- Increase `TCP_IDLE_TTL` if connections expire between poll cycles
+- Keepalive probes (default: every 15s) keep NAT tables alive — adjust `TCP_KEEPALIVE_IDLE` if needed
+- Set `TCP_PERSISTENT_ENABLED=false` to disable pooling for debugging
+
### Rate Limiting
- API automatically enforces 1-second delay between commands
- If experiencing delays, this is normal device behavior
diff --git a/app/main.py b/app/main.py
index 406d7fc..ffce6a2 100644
--- a/app/main.py
+++ b/app/main.py
@@ -29,7 +29,11 @@ logger.info("Database tables initialized")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle - startup and shutdown events."""
+ from app.services import _connection_pool
+
# Startup
+ logger.info("Starting TCP connection pool cleanup task...")
+ _connection_pool.start_cleanup()
logger.info("Starting background poller...")
await poller.start()
logger.info("Background poller started")
@@ -40,12 +44,15 @@ async def lifespan(app: FastAPI):
logger.info("Stopping background poller...")
await poller.stop()
logger.info("Background poller stopped")
+ logger.info("Closing TCP connection pool...")
+ await _connection_pool.close_all()
+ logger.info("TCP connection pool closed")
app = FastAPI(
title="SLMM NL43 Addon",
description="Standalone module for NL43 configuration and status APIs with background polling",
- version="0.2.0",
+ version="0.3.0",
lifespan=lifespan,
)
@@ -85,10 +92,14 @@ async def health():
@app.get("/health/devices")
async def health_devices():
- """Enhanced health check that tests device connectivity."""
+ """Enhanced health check that tests device connectivity.
+
+ Uses the connection pool to avoid unnecessary TCP handshakes — if a
+ cached connection exists and is alive, the device is reachable.
+ """
from sqlalchemy.orm import Session
from app.database import SessionLocal
- from app.services import NL43Client
+ from app.services import _connection_pool
from app.models import NL43Config
db: Session = SessionLocal()
@@ -98,7 +109,7 @@ async def health_devices():
configs = db.query(NL43Config).filter_by(tcp_enabled=True).all()
for cfg in configs:
- client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
+ device_key = f"{cfg.host}:{cfg.tcp_port}"
status = {
"unit_id": cfg.unit_id,
"host": cfg.host,
@@ -108,14 +119,22 @@ async def health_devices():
}
try:
- # Try to connect (don't send command to avoid rate limiting issues)
- import asyncio
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=2.0
- )
- writer.close()
- await writer.wait_closed()
- status["reachable"] = True
+ # Check if pool already has a live connection (zero-cost check)
+ pool_stats = _connection_pool.get_stats()
+ conn_info = pool_stats["connections"].get(device_key)
+ if conn_info and conn_info["alive"]:
+ status["reachable"] = True
+ status["source"] = "pool"
+ else:
+ # No cached connection — do a lightweight acquire/release
+ # This opens a connection if needed but keeps it in the pool
+ import asyncio
+ reader, writer, from_cache = await _connection_pool.acquire(
+ device_key, cfg.host, cfg.tcp_port, timeout=2.0
+ )
+ await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
+ status["reachable"] = True
+ status["source"] = "cached" if from_cache else "new"
except Exception as e:
status["error"] = str(type(e).__name__)
logger.warning(f"Device {cfg.unit_id} health check failed: {e}")
diff --git a/app/routers.py b/app/routers.py
index 40ecaf8..a21c928 100644
--- a/app/routers.py
+++ b/app/routers.py
@@ -93,6 +93,34 @@ class PollingConfigPayload(BaseModel):
poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device")
+# ============================================================================
+# TCP CONNECTION POOL ENDPOINTS (must be before /{unit_id} routes)
+# ============================================================================
+
+@router.get("/_connections/status")
+async def get_connection_pool_status():
+ """Get status of the persistent TCP connection pool.
+
+ Returns information about cached connections, keepalive settings,
+ and per-device connection age/idle times.
+ """
+ from app.services import _connection_pool
+ return {"status": "ok", "pool": _connection_pool.get_stats()}
+
+
+@router.post("/_connections/flush")
+async def flush_connection_pool():
+ """Close all cached TCP connections.
+
+ Useful for debugging or forcing fresh connections to all devices.
+ """
+ from app.services import _connection_pool
+ await _connection_pool.close_all()
+ # Restart cleanup task since close_all cancels it
+ _connection_pool.start_cleanup()
+ return {"status": "ok", "message": "All cached connections closed"}
+
+
# ============================================================================
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
# ============================================================================
@@ -1727,74 +1755,38 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
"message": "TCP communication enabled"
}
- # Test 3: Modem/Router reachable (check port 443 HTTPS)
+ # Test 3: TCP connection reachable (device port) — uses connection pool
+ # This avoids extra TCP handshakes over cellular. If a cached connection
+ # exists and is alive, we skip the handshake entirely.
+ from app.services import _connection_pool
+ device_key = f"{cfg.host}:{cfg.tcp_port}"
try:
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(cfg.host, 443), timeout=3.0
- )
- writer.close()
- await writer.wait_closed()
- diagnostics["tests"]["modem_reachable"] = {
- "status": "pass",
- "message": f"Modem/router reachable at {cfg.host}"
- }
- except asyncio.TimeoutError:
- diagnostics["tests"]["modem_reachable"] = {
- "status": "fail",
- "message": f"Modem/router timeout at {cfg.host} (network issue)"
- }
- diagnostics["overall_status"] = "fail"
- return diagnostics
- except ConnectionRefusedError:
- # Connection refused means host is up but port 443 closed - that's ok
- diagnostics["tests"]["modem_reachable"] = {
- "status": "pass",
- "message": f"Modem/router reachable at {cfg.host} (HTTPS closed)"
- }
- except Exception as e:
- diagnostics["tests"]["modem_reachable"] = {
- "status": "fail",
- "message": f"Cannot reach modem/router at {cfg.host}: {str(e)}"
- }
- diagnostics["overall_status"] = "fail"
- return diagnostics
-
- # Test 4: TCP connection reachable (device port)
- try:
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=3.0
- )
- writer.close()
- await writer.wait_closed()
- diagnostics["tests"]["tcp_connection"] = {
- "status": "pass",
- "message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}"
- }
- except asyncio.TimeoutError:
- diagnostics["tests"]["tcp_connection"] = {
- "status": "fail",
- "message": f"Connection timeout to {cfg.host}:{cfg.tcp_port}"
- }
- diagnostics["overall_status"] = "fail"
- return diagnostics
- except ConnectionRefusedError:
- diagnostics["tests"]["tcp_connection"] = {
- "status": "fail",
- "message": f"Connection refused by {cfg.host}:{cfg.tcp_port}"
- }
- diagnostics["overall_status"] = "fail"
- return diagnostics
+ pool_stats = _connection_pool.get_stats()
+ conn_info = pool_stats["connections"].get(device_key)
+ if conn_info and conn_info["alive"]:
+ # Pool already has a live connection — device is reachable
+ diagnostics["tests"]["tcp_connection"] = {
+ "status": "pass",
+ "message": f"TCP connection alive in pool for {cfg.host}:{cfg.tcp_port}"
+ }
+ else:
+ # Acquire through the pool (opens new if needed, keeps it cached)
+ reader, writer, from_cache = await _connection_pool.acquire(
+ device_key, cfg.host, cfg.tcp_port, timeout=3.0
+ )
+ await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
+ diagnostics["tests"]["tcp_connection"] = {
+ "status": "pass",
+ "message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}"
+ }
except Exception as e:
diagnostics["tests"]["tcp_connection"] = {
"status": "fail",
- "message": f"Connection error: {str(e)}"
+ "message": f"Connection error to {cfg.host}:{cfg.tcp_port}: {str(e)}"
}
diagnostics["overall_status"] = "fail"
return diagnostics
- # Wait a bit after connection test to let device settle
- await asyncio.sleep(1.5)
-
# Test 5: Device responds to commands
# Use longer timeout to account for rate limiting (device requires ≥1s between commands)
client = NL43Client(cfg.host, cfg.tcp_port, timeout=10.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
diff --git a/app/services.py b/app/services.py
index 7e4c554..860bffa 100644
--- a/app/services.py
+++ b/app/services.py
@@ -1,20 +1,22 @@
"""
NL43 TCP connector and snapshot persistence.
-Implements simple per-request TCP calls to avoid long-lived socket complexity.
-Extend to pooled connections/DRD streaming later.
+Implements persistent per-device TCP connections with OS-level keepalive
+to reduce handshake overhead and survive cellular modem NAT timeouts.
+Falls back to per-request connections on error with transparent retry.
"""
import asyncio
import contextlib
import logging
+import socket
import time
import os
import zipfile
import tempfile
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
-from typing import Optional, List, Dict
+from typing import Optional, List, Dict, Tuple
from sqlalchemy.orm import Session
from ftplib import FTP
from pathlib import Path
@@ -234,6 +236,293 @@ async def _get_device_lock(device_key: str) -> asyncio.Lock:
return _device_locks[device_key]
+# ---------------------------------------------------------------------------
+# Persistent TCP connection pool with OS-level keepalive
+# ---------------------------------------------------------------------------
+
+# Configuration via environment variables
+TCP_PERSISTENT_ENABLED = os.getenv("TCP_PERSISTENT_ENABLED", "true").lower() == "true"
+TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "300")) # Close idle connections after N seconds
+TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "1800")) # Force reconnect after N seconds
+TCP_KEEPALIVE_IDLE = int(os.getenv("TCP_KEEPALIVE_IDLE", "15")) # Seconds idle before probes
+TCP_KEEPALIVE_INTERVAL = int(os.getenv("TCP_KEEPALIVE_INTERVAL", "10")) # Seconds between probes
+TCP_KEEPALIVE_COUNT = int(os.getenv("TCP_KEEPALIVE_COUNT", "3")) # Failed probes before dead
+
+logger.info(
+ f"TCP connection pool: persistent={TCP_PERSISTENT_ENABLED}, "
+ f"idle_ttl={TCP_IDLE_TTL}s, max_age={TCP_MAX_AGE}s, "
+ f"keepalive_idle={TCP_KEEPALIVE_IDLE}s, keepalive_interval={TCP_KEEPALIVE_INTERVAL}s, "
+ f"keepalive_count={TCP_KEEPALIVE_COUNT}"
+)
+
+
+@dataclass
+class DeviceConnection:
+ """Tracks a cached TCP connection and its metadata."""
+ reader: asyncio.StreamReader
+ writer: asyncio.StreamWriter
+ device_key: str
+ host: str
+ port: int
+ created_at: float = field(default_factory=time.time)
+ last_used_at: float = field(default_factory=time.time)
+
+
+class ConnectionPool:
+ """Per-device persistent TCP connection cache with OS-level keepalive.
+
+ Each NL-43 device supports only one TCP connection at a time. This pool
+ caches that single connection per device key and reuses it across commands,
+ avoiding repeated TCP handshakes over high-latency cellular links.
+
+ Keepalive probes keep cellular NAT tables alive and detect dead connections
+ before the next command attempt.
+ """
+
+ def __init__(
+ self,
+ enable_persistent: bool = True,
+ idle_ttl: float = 120.0,
+ max_age: float = 300.0,
+ keepalive_idle: int = 15,
+ keepalive_interval: int = 10,
+ keepalive_count: int = 3,
+ ):
+ self._connections: Dict[str, DeviceConnection] = {}
+ self._lock = asyncio.Lock()
+ self._enable_persistent = enable_persistent
+ self._idle_ttl = idle_ttl
+ self._max_age = max_age
+ self._keepalive_idle = keepalive_idle
+ self._keepalive_interval = keepalive_interval
+ self._keepalive_count = keepalive_count
+ self._cleanup_task: Optional[asyncio.Task] = None
+
+ # -- lifecycle ----------------------------------------------------------
+
+ def start_cleanup(self):
+ """Start background task that evicts stale connections."""
+ if self._enable_persistent and self._cleanup_task is None:
+ self._cleanup_task = asyncio.create_task(self._cleanup_loop())
+ logger.info("Connection pool cleanup task started")
+
+ async def close_all(self):
+ """Close all cached connections (called at shutdown)."""
+ if self._cleanup_task is not None:
+ self._cleanup_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await self._cleanup_task
+ self._cleanup_task = None
+
+ async with self._lock:
+ for key, conn in list(self._connections.items()):
+ await self._close_connection(conn, reason="shutdown")
+ self._connections.clear()
+ logger.info("Connection pool: all connections closed")
+
+ # -- public API ---------------------------------------------------------
+
+ async def acquire(
+ self, device_key: str, host: str, port: int, timeout: float
+ ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter, bool]:
+ """Get a connection for a device (cached or fresh).
+
+ Returns:
+ (reader, writer, from_cache) — from_cache is True if reused.
+ """
+ if self._enable_persistent:
+ async with self._lock:
+ conn = self._connections.pop(device_key, None)
+
+ if conn is not None:
+ if self._is_alive(conn):
+ self._drain_buffer(conn.reader)
+ conn.last_used_at = time.time()
+ logger.debug(f"Pool hit for {device_key} (age={time.time() - conn.created_at:.0f}s)")
+ return conn.reader, conn.writer, True
+ else:
+ await self._close_connection(conn, reason="stale")
+
+ # Open fresh connection
+ reader, writer = await self._open_connection(host, port, timeout)
+ logger.debug(f"New connection opened for {device_key}")
+ return reader, writer, False
+
+ async def release(self, device_key: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int):
+ """Return a connection to the pool for reuse."""
+ if not self._enable_persistent:
+ self._close_writer(writer)
+ return
+
+ # Check transport is still healthy before caching
+ if writer.transport.is_closing() or reader.at_eof():
+ self._close_writer(writer)
+ return
+
+ conn = DeviceConnection(
+ reader=reader,
+ writer=writer,
+ device_key=device_key,
+ host=host,
+ port=port,
+ )
+
+ async with self._lock:
+ # Evict any existing connection for this device (shouldn't happen
+ # under normal locking, but be safe)
+ old = self._connections.pop(device_key, None)
+ if old is not None:
+ await self._close_connection(old, reason="replaced")
+ self._connections[device_key] = conn
+
+ async def discard(self, device_key: str):
+ """Close and remove a connection from the pool (called on errors)."""
+ async with self._lock:
+ conn = self._connections.pop(device_key, None)
+ if conn is not None:
+ await self._close_connection(conn, reason="discarded")
+ logger.debug(f"Pool discard for {device_key}")
+
+ def get_stats(self) -> dict:
+ """Return pool status for diagnostics."""
+ now = time.time()
+ connections = {}
+ for key, conn in self._connections.items():
+ connections[key] = {
+ "host": conn.host,
+ "port": conn.port,
+ "age_seconds": round(now - conn.created_at, 1),
+ "idle_seconds": round(now - conn.last_used_at, 1),
+ "alive": self._is_alive(conn),
+ }
+ return {
+ "enabled": self._enable_persistent,
+ "active_connections": len(self._connections),
+ "idle_ttl": self._idle_ttl,
+ "max_age": self._max_age,
+ "keepalive_idle": self._keepalive_idle,
+ "keepalive_interval": self._keepalive_interval,
+ "keepalive_count": self._keepalive_count,
+ "connections": connections,
+ }
+
+ # -- internals ----------------------------------------------------------
+
+ async def _open_connection(
+ self, host: str, port: int, timeout: float
+ ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+ """Open a new TCP connection with keepalive options set."""
+ try:
+ reader, writer = await asyncio.wait_for(
+ asyncio.open_connection(host, port), timeout=timeout
+ )
+ except asyncio.TimeoutError:
+ raise ConnectionError(f"Failed to connect to device at {host}:{port}")
+ except Exception as e:
+ raise ConnectionError(f"Failed to connect to device: {e}")
+
+ # Set TCP keepalive on the underlying socket
+ self._set_keepalive(writer)
+ return reader, writer
+
+ def _set_keepalive(self, writer: asyncio.StreamWriter):
+ """Configure OS-level TCP keepalive on the connection socket."""
+ try:
+ sock = writer.transport.get_extra_info("socket")
+ if sock is None:
+ logger.warning("Could not access underlying socket for keepalive")
+ return
+
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+
+ # Linux-specific keepalive tuning
+ if hasattr(socket, "TCP_KEEPIDLE"):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self._keepalive_idle)
+ if hasattr(socket, "TCP_KEEPINTVL"):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, self._keepalive_interval)
+ if hasattr(socket, "TCP_KEEPCNT"):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self._keepalive_count)
+
+ logger.debug(
+ f"TCP keepalive set: idle={self._keepalive_idle}s, "
+ f"interval={self._keepalive_interval}s, count={self._keepalive_count}"
+ )
+ except OSError as e:
+ logger.warning(f"Failed to set TCP keepalive: {e}")
+
+ def _is_alive(self, conn: DeviceConnection) -> bool:
+ """Check whether a cached connection is still usable."""
+ now = time.time()
+
+ # Age / idle checks
+ if now - conn.last_used_at > self._idle_ttl:
+ logger.debug(f"Connection {conn.device_key} idle too long ({now - conn.last_used_at:.0f}s > {self._idle_ttl}s)")
+ return False
+ if now - conn.created_at > self._max_age:
+ logger.debug(f"Connection {conn.device_key} too old ({now - conn.created_at:.0f}s > {self._max_age}s)")
+ return False
+
+ # Transport-level checks
+ transport = conn.writer.transport
+ if transport.is_closing():
+ logger.debug(f"Connection {conn.device_key} transport is closing")
+ return False
+ if conn.reader.at_eof():
+ logger.debug(f"Connection {conn.device_key} reader at EOF")
+ return False
+
+ return True
+
+ @staticmethod
+ def _drain_buffer(reader: asyncio.StreamReader):
+ """Drain any pending bytes (e.g. '$' prompt) from an idle connection."""
+ buf = reader._buffer # noqa: SLF001 — internal but stable across CPython
+ if buf:
+ pending = bytes(buf)
+ buf.clear()
+ logger.debug(f"Drained {len(pending)} bytes from cached connection: {pending!r}")
+
+ @staticmethod
+ def _close_writer(writer: asyncio.StreamWriter):
+ """Close a writer, suppressing errors."""
+ try:
+ writer.close()
+ except Exception:
+ pass
+
+ async def _close_connection(self, conn: DeviceConnection, reason: str = ""):
+ """Fully close a cached connection."""
+ logger.debug(f"Closing connection {conn.device_key} ({reason})")
+ conn.writer.close()
+ with contextlib.suppress(Exception):
+ await conn.writer.wait_closed()
+
+ async def _cleanup_loop(self):
+ """Periodically evict idle/expired connections."""
+ try:
+ while True:
+ await asyncio.sleep(30)
+ async with self._lock:
+ for key in list(self._connections):
+ conn = self._connections[key]
+ if not self._is_alive(conn):
+ del self._connections[key]
+ await self._close_connection(conn, reason="cleanup")
+ except asyncio.CancelledError:
+ pass
+
+
+# Module-level pool singleton
+_connection_pool = ConnectionPool(
+ enable_persistent=TCP_PERSISTENT_ENABLED,
+ idle_ttl=TCP_IDLE_TTL,
+ max_age=TCP_MAX_AGE,
+ keepalive_idle=TCP_KEEPALIVE_IDLE,
+ keepalive_interval=TCP_KEEPALIVE_INTERVAL,
+ keepalive_count=TCP_KEEPALIVE_COUNT,
+)
+
+
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21):
self.host = host
@@ -275,72 +564,97 @@ class NL43Client:
return await self._send_command_unlocked(cmd)
async def _send_command_unlocked(self, cmd: str) -> str:
- """Internal: send command without acquiring device lock (lock must be held by caller)."""
+ """Internal: send command without acquiring device lock (lock must be held by caller).
+
+ Uses the connection pool to reuse cached TCP connections when possible.
+ If a cached connection fails, retries once with a fresh connection.
+ """
await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
try:
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(self.host, self.port), timeout=self.timeout
+ reader, writer, from_cache = await _connection_pool.acquire(
+ self.device_key, self.host, self.port, self.timeout
)
- except asyncio.TimeoutError:
- logger.error(f"Connection timeout to {self.device_key}")
- raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
- except Exception as e:
- logger.error(f"Connection failed to {self.device_key}: {e}")
- raise ConnectionError(f"Failed to connect to device: {str(e)}")
+ except ConnectionError:
+ logger.error(f"Connection failed to {self.device_key}")
+ raise
try:
- writer.write(cmd.encode("ascii"))
- await writer.drain()
-
- # Read first line (result code)
- first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
- result_code = first_line_data.decode(errors="ignore").strip()
-
- # Remove leading $ prompt if present
- if result_code.startswith("$"):
- result_code = result_code[1:].strip()
-
- logger.info(f"Result code from {self.device_key}: {result_code}")
-
- # Check result code
- if result_code == "R+0000":
- # Success - for query commands, read the second line with actual data
- is_query = cmd.strip().endswith("?")
- if is_query:
- data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
- response = data_line.decode(errors="ignore").strip()
- logger.debug(f"Data line from {self.device_key}: {response}")
- return response
- else:
- # Setting command - return success code
- return result_code
- elif result_code == "R+0001":
- raise ValueError("Command error - device did not recognize command")
- elif result_code == "R+0002":
- raise ValueError("Parameter error - invalid parameter value")
- elif result_code == "R+0003":
- raise ValueError("Spec/type error - command not supported by this device model")
- elif result_code == "R+0004":
- raise ValueError("Status error - device is in wrong state for this command")
- else:
- raise ValueError(f"Unknown result code: {result_code}")
-
- except asyncio.TimeoutError:
- logger.error(f"Response timeout from {self.device_key}")
- raise TimeoutError(f"Device did not respond within {self.timeout}s")
- except Exception as e:
- logger.error(f"Communication error with {self.device_key}: {e}")
- raise
- finally:
- writer.close()
- with contextlib.suppress(Exception):
- await writer.wait_closed()
- # Record completion time for rate limiting — NL43 requires ≥1s
- # after response before next command, so measure from connection close
+ response = await self._execute_command(reader, writer, cmd)
+ # Success — return connection to pool for reuse
+ await _connection_pool.release(self.device_key, reader, writer, self.host, self.port)
_last_command_time[self.device_key] = time.time()
+ return response
+
+ except Exception as e:
+ # Discard the bad connection
+ await _connection_pool.discard(self.device_key)
+ ConnectionPool._close_writer(writer)
+
+ if from_cache:
+ # Retry once with a fresh connection — the cached one may have gone stale
+ logger.warning(f"Cached connection failed for {self.device_key}, retrying fresh: {e}")
+ await self._enforce_rate_limit()
+
+ try:
+ reader, writer, _ = await _connection_pool.acquire(
+ self.device_key, self.host, self.port, self.timeout
+ )
+ except ConnectionError:
+ logger.error(f"Retry connection also failed to {self.device_key}")
+ raise
+
+ try:
+ response = await self._execute_command(reader, writer, cmd)
+ await _connection_pool.release(self.device_key, reader, writer, self.host, self.port)
+ _last_command_time[self.device_key] = time.time()
+ return response
+ except Exception:
+ await _connection_pool.discard(self.device_key)
+ ConnectionPool._close_writer(writer)
+ raise
+ else:
+ raise
+
+ async def _execute_command(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, cmd: str) -> str:
+ """Send a command over an existing connection and parse the NL43 response."""
+ writer.write(cmd.encode("ascii"))
+ await writer.drain()
+
+ # Read first line (result code)
+ first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
+ result_code = first_line_data.decode(errors="ignore").strip()
+
+ # Remove leading $ prompt if present
+ if result_code.startswith("$"):
+ result_code = result_code[1:].strip()
+
+ logger.info(f"Result code from {self.device_key}: {result_code}")
+
+ # Check result code
+ if result_code == "R+0000":
+ # Success — for query commands, read the second line with actual data
+ is_query = cmd.strip().endswith("?")
+ if is_query:
+ data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
+ response = data_line.decode(errors="ignore").strip()
+ logger.debug(f"Data line from {self.device_key}: {response}")
+ return response
+ else:
+ # Setting command — return success code
+ return result_code
+ elif result_code == "R+0001":
+ raise ValueError("Command error - device did not recognize command")
+ elif result_code == "R+0002":
+ raise ValueError("Parameter error - invalid parameter value")
+ elif result_code == "R+0003":
+ raise ValueError("Spec/type error - command not supported by this device model")
+ elif result_code == "R+0004":
+ raise ValueError("Status error - device is in wrong state for this command")
+ else:
+ raise ValueError(f"Unknown result code: {result_code}")
async def request_dod(self) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device.
@@ -582,20 +896,19 @@ class NL43Client:
# Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
+ # Evict any cached connection — streaming needs its own dedicated socket
+ await _connection_pool.discard(self.device_key)
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
try:
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(self.host, self.port), timeout=self.timeout
+ reader, writer = await _connection_pool._open_connection(
+ self.host, self.port, self.timeout
)
- except asyncio.TimeoutError:
- logger.error(f"DRD stream connection timeout to {self.device_key}")
- raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
- except Exception as e:
- logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
- raise ConnectionError(f"Failed to connect to device: {str(e)}")
+ except ConnectionError:
+ logger.error(f"DRD stream connection failed to {self.device_key}")
+ raise
try:
# Start DRD streaming
diff --git a/templates/roster.html b/templates/roster.html
index 6c8d23d..d22d086 100644
--- a/templates/roster.html
+++ b/templates/roster.html
@@ -3,7 +3,7 @@
- SLMM Roster - Sound Level Meter Configuration
+ SLMM - Device Roster & Connections
-
-
-
-
- | Unit ID |
- Host / IP |
- TCP Port |
- FTP Port |
- TCP |
- FTP |
- Polling |
- Status |
- Actions |
-
-
-
-
- |
- Loading...
- |
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+ | Unit ID |
+ Host / IP |
+ TCP Port |
+ FTP Port |
+ TCP |
+ FTP |
+ Polling |
+ Status |
+ Actions |
+
+
+
+
+ |
+ Loading...
+ |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Pool Configuration
+
+
+
Active Connections
+
+
@@ -619,6 +743,159 @@
closeModal();
}
});
+
+ // ========== Tab Switching ==========
+
+ function switchTab(tabName) {
+ document.querySelectorAll('.tab-btn').forEach(btn => btn.classList.remove('active'));
+ document.querySelectorAll('.tab-panel').forEach(panel => panel.classList.remove('active'));
+
+ document.querySelector(`.tab-btn[onclick="switchTab('${tabName}')"]`).classList.add('active');
+ document.getElementById(`tab-${tabName}`).classList.add('active');
+
+ if (tabName === 'connections') {
+ loadConnections();
+ }
+ }
+
+ // ========== Connection Pool ==========
+
+ let connectionsRefreshTimer = null;
+
+ async function loadConnections() {
+ try {
+ const res = await fetch('/api/nl43/_connections/status');
+ const data = await res.json();
+
+ if (!res.ok) {
+ showToast('Failed to load connection pool status', 'error');
+ return;
+ }
+
+ const pool = data.pool;
+ renderPoolConfig(pool);
+ renderConnections(pool.connections);
+
+ // Auto-refresh while tab is active
+ clearTimeout(connectionsRefreshTimer);
+ if (document.getElementById('tab-connections').classList.contains('active')) {
+ connectionsRefreshTimer = setTimeout(loadConnections, 5000);
+ }
+ } catch (err) {
+ showToast('Error loading connections: ' + err.message, 'error');
+ console.error('Load connections error:', err);
+ }
+ }
+
+ function renderPoolConfig(pool) {
+ document.getElementById('poolConfig').innerHTML = `
+
+
Persistent
+
${pool.enabled ? 'Enabled' : 'Disabled'}
+
+
+
Active
+
${pool.active_connections}
+
+
+
Idle TTL
+
${pool.idle_ttl}s
+
+
+
Max Age
+
${pool.max_age}s
+
+
+
KA Idle
+
${pool.keepalive_idle}s
+
+
+
KA Interval
+
${pool.keepalive_interval}s
+
+
+
KA Probes
+
${pool.keepalive_count}
+
+ `;
+ }
+
+ function renderConnections(connections) {
+ const container = document.getElementById('connectionsList');
+ const keys = Object.keys(connections);
+
+ if (keys.length === 0) {
+ container.innerHTML = `
+
+
~
+
No active connections
+
+ Connections appear here when devices are actively being polled and the connection is cached between commands.
+
+
+ `;
+ return;
+ }
+
+ container.innerHTML = keys.map(key => {
+ const conn = connections[key];
+ const aliveColor = conn.alive ? '#1a7f37' : '#cf222e';
+ const aliveText = conn.alive ? 'Alive' : 'Stale';
+ return `
+
+
+
+
+
Host
+
${escapeHtml(conn.host)}
+
+
+
+
Age
+
${formatSeconds(conn.age_seconds)}
+
+
+
Idle
+
${formatSeconds(conn.idle_seconds)}
+
+
+
+ `;
+ }).join('');
+ }
+
+ function formatSeconds(s) {
+ if (s < 60) return Math.round(s) + 's';
+ if (s < 3600) return Math.floor(s / 60) + 'm ' + Math.round(s % 60) + 's';
+ return Math.floor(s / 3600) + 'h ' + Math.floor((s % 3600) / 60) + 'm';
+ }
+
+ async function flushConnections() {
+ if (!confirm('Close all cached TCP connections?\n\nDevices will reconnect on the next poll cycle.')) {
+ return;
+ }
+
+ try {
+ const res = await fetch('/api/nl43/_connections/flush', { method: 'POST' });
+ const data = await res.json();
+
+ if (!res.ok) {
+ showToast(data.detail || 'Failed to flush connections', 'error');
+ return;
+ }
+
+ showToast('All connections flushed', 'success');
+ await loadConnections();
+ } catch (err) {
+ showToast('Error flushing connections: ' + err.message, 'error');
+ }
+ }