fix: reuse pooled TCP connection for DRD streaming

stream_drd() discarded the pooled connection and forced a fresh connect.
The NL43 allows only one TCP connection at a time; over a cellular link
the device does not free its single slot fast enough for an immediate
reconnect, so the fresh connect times out — the live DRD stream fails
while start/stop commands (which reuse the warm pooled socket) keep
working. This surfaced once the persistent connection pool was enabled
(TCP_PERSISTENT_ENABLED=true).

Stream over the already-open pooled connection via acquire() instead of
discard()+_open_connection(), and release() it back to the pool on exit
(after sending SUB to stop the stream) so commands keep reusing the same
single socket. The per-device lock is held for the whole streaming
session, so the poller can't touch the socket concurrently.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 19:00:35 +00:00
parent e3f9ca7f5b
commit af86cf713e
+17 -8
View File
@@ -896,15 +896,20 @@ class NL43Client:
# Acquire per-device lock - held for entire streaming session # Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key) device_lock = await _get_device_lock(self.device_key)
async with device_lock: async with device_lock:
# Evict any cached connection — streaming needs its own dedicated socket
await _connection_pool.discard(self.device_key)
await self._enforce_rate_limit() await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}") logger.info(f"Starting DRD stream for {self.device_key}")
# Reuse the pooled connection instead of discard()+reopen. The NL43
# allows only ONE TCP connection at a time, and on a cellular link the
# device does not free its single slot fast enough for an immediate
# reconnect — so a fresh connect times out (the DRD stream failure).
# The per-device lock is held for the whole session, so it already
# blocks the poller; reusing the warm socket keeps us at exactly one
# connection and lets the stream start on the slot commands already use.
try: try:
reader, writer = await _connection_pool._open_connection( reader, writer, from_cache = await _connection_pool.acquire(
self.host, self.port, self.timeout self.device_key, self.host, self.port, self.timeout
) )
except ConnectionError: except ConnectionError:
logger.error(f"DRD stream connection failed to {self.device_key}") logger.error(f"DRD stream connection failed to {self.device_key}")
@@ -981,16 +986,20 @@ class NL43Client:
break break
finally: finally:
# Send SUB character to stop streaming # Stop streaming on the device (SUB = 0x1A), then return the warm
# connection to the pool so subsequent commands reuse this single
# socket instead of opening a second one. release() returns healthy
# sockets to the pool and closes dead ones; the next acquire()
# drains any residual stop output before reuse.
try: try:
writer.write(b"\x1A") writer.write(b"\x1A")
await writer.drain() await writer.drain()
except Exception: except Exception:
pass pass
writer.close() await _connection_pool.release(
with contextlib.suppress(Exception): self.device_key, reader, writer, self.host, self.port
await writer.wait_closed() )
logger.info(f"DRD stream ended for {self.device_key}") logger.info(f"DRD stream ended for {self.device_key}")