add: device communication lock, Now to send a tcp command, slmm must establish a connection lock to prevent flooding unit.

fixed: Background poller intervals increased.
This commit is contained in:
serversdwn
2026-01-29 06:08:55 +00:00
parent 67d63b4173
commit eb39a9d1d0
3 changed files with 143 additions and 107 deletions

View File

@@ -14,7 +14,7 @@ import zipfile
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, List
from typing import Optional, List, Dict
from sqlalchemy.orm import Session
from ftplib import FTP
from pathlib import Path
@@ -105,6 +105,19 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
_last_command_time = {}
_rate_limit_lock = asyncio.Lock()
# Per-device connection locks: NL43 devices only support one TCP connection at a time
# This prevents concurrent connections from fighting for the device
_device_locks: Dict[str, asyncio.Lock] = {}
_device_locks_lock = asyncio.Lock()
async def _get_device_lock(device_key: str) -> asyncio.Lock:
"""Get or create a lock for a specific device."""
async with _device_locks_lock:
if device_key not in _device_locks:
_device_locks[device_key] = asyncio.Lock()
return _device_locks[device_key]
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21):
@@ -133,7 +146,17 @@ class NL43Client:
NL43 protocol returns two lines for query commands:
Line 1: Result code (R+0000 for success, error codes otherwise)
Line 2: Actual data (for query commands ending with '?')
This method acquires a per-device lock to ensure only one TCP connection
is active at a time (NL43 devices only support single connections).
"""
# Acquire per-device lock to prevent concurrent connections
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
return await self._send_command_unlocked(cmd)
async def _send_command_unlocked(self, cmd: str) -> str:
"""Internal: send command without acquiring device lock (lock must be held by caller)."""
await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
@@ -429,105 +452,112 @@ class NL43Client:
The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream.
NOTE: This method holds the device lock for the entire duration of streaming,
blocking other commands to this device. This is intentional since NL43 devices
only support one TCP connection at a time.
"""
await self._enforce_rate_limit()
# Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
logger.info(f"DRD stream ended for {self.device_key}")
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
logger.info(f"DRD stream ended for {self.device_key}")
async def set_measurement_time(self, preset: str):
"""Set measurement time preset.