From af86cf713eb7353b1255548f20bfbdf52eb9b038 Mon Sep 17 00:00:00 2001 From: serversdown Date: Mon, 8 Jun 2026 19:00:35 +0000 Subject: [PATCH] fix: reuse pooled TCP connection for DRD streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stream_drd() discarded the pooled connection and forced a fresh connect. The NL43 allows only one TCP connection at a time; over a cellular link the device does not free its single slot fast enough for an immediate reconnect, so the fresh connect times out — the live DRD stream fails while start/stop commands (which reuse the warm pooled socket) keep working. This surfaced once the persistent connection pool was enabled (TCP_PERSISTENT_ENABLED=true). Stream over the already-open pooled connection via acquire() instead of discard()+_open_connection(), and release() it back to the pool on exit (after sending SUB to stop the stream) so commands keep reusing the same single socket. The per-device lock is held for the whole streaming session, so the poller can't touch the socket concurrently. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/app/services.py b/app/services.py index aa9d37c..9375c5c 100644 --- a/app/services.py +++ b/app/services.py @@ -896,15 +896,20 @@ class NL43Client: # Acquire per-device lock - held for entire streaming session device_lock = await _get_device_lock(self.device_key) async with device_lock: - # Evict any cached connection — streaming needs its own dedicated socket - await _connection_pool.discard(self.device_key) await self._enforce_rate_limit() logger.info(f"Starting DRD stream for {self.device_key}") + # Reuse the pooled connection instead of discard()+reopen. The NL43 + # allows only ONE TCP connection at a time, and on a cellular link the + # device does not free its single slot fast enough for an immediate + # reconnect — so a fresh connect times out (the DRD stream failure). + # The per-device lock is held for the whole session, so it already + # blocks the poller; reusing the warm socket keeps us at exactly one + # connection and lets the stream start on the slot commands already use. try: - reader, writer = await _connection_pool._open_connection( - self.host, self.port, self.timeout + reader, writer, from_cache = await _connection_pool.acquire( + self.device_key, self.host, self.port, self.timeout ) except ConnectionError: logger.error(f"DRD stream connection failed to {self.device_key}") @@ -981,16 +986,20 @@ class NL43Client: break finally: - # Send SUB character to stop streaming + # Stop streaming on the device (SUB = 0x1A), then return the warm + # connection to the pool so subsequent commands reuse this single + # socket instead of opening a second one. release() returns healthy + # sockets to the pool and closes dead ones; the next acquire() + # drains any residual stop output before reuse. try: writer.write(b"\x1A") await writer.drain() except Exception: pass - writer.close() - with contextlib.suppress(Exception): - await writer.wait_closed() + await _connection_pool.release( + self.device_key, reader, writer, self.host, self.port + ) logger.info(f"DRD stream ended for {self.device_key}")