Background poller intervals increased.

This commit is contained in:
serversdwn
2026-01-29 06:08:55 +00:00
parent 67d63b4173
commit 89fca9d0fe
3 changed files with 143 additions and 107 deletions

View File

@@ -25,7 +25,7 @@ class BackgroundPoller:
Background task that continuously polls NL43 devices and updates status cache.
Features:
- Per-device configurable poll intervals (10-3600 seconds)
- Per-device configurable poll intervals (30 seconds to 6 hours)
- Automatic offline detection (marks unreachable after 3 consecutive failures)
- Dynamic sleep intervals based on device configurations
- Graceful shutdown on application stop
@@ -230,8 +230,8 @@ class BackgroundPoller:
Calculate the next sleep interval based on all device poll intervals.
Returns a dynamic sleep time that ensures responsive polling:
- Minimum 10 seconds (prevents tight loops)
- Maximum 30 seconds (ensures responsiveness)
- Minimum 30 seconds (prevents tight loops)
- Maximum 300 seconds / 5 minutes (ensures reasonable responsiveness for long intervals)
- Generally half the minimum device interval
Returns:
@@ -245,14 +245,15 @@ class BackgroundPoller:
).all()
if not configs:
return 30 # Default sleep when no devices configured
return 60 # Default sleep when no devices configured
# Get all intervals
intervals = [cfg.poll_interval_seconds or 60 for cfg in configs]
min_interval = min(intervals)
# Use half the minimum interval, but cap between 10-30 seconds
sleep_time = max(10, min(30, min_interval // 2))
# Use half the minimum interval, but cap between 30-300 seconds
# This allows longer sleep times when polling intervals are long (e.g., hourly)
sleep_time = max(30, min(300, min_interval // 2))
return sleep_time

View File

@@ -81,14 +81,14 @@ class ConfigPayload(BaseModel):
@field_validator("poll_interval_seconds")
@classmethod
def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600):
raise ValueError("Poll interval must be between 10 and 3600 seconds")
if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v
class PollingConfigPayload(BaseModel):
"""Payload for updating device polling configuration."""
poll_interval_seconds: int | None = Field(None, ge=10, le=3600, description="Polling interval in seconds (10-3600)")
poll_interval_seconds: int | None = Field(None, ge=30, le=21600, description="Polling interval in seconds (30s to 6 hours)")
poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device")
@@ -233,8 +233,8 @@ class RosterCreatePayload(BaseModel):
@field_validator("poll_interval_seconds")
@classmethod
def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600):
raise ValueError("Poll interval must be between 10 and 3600 seconds")
if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v
@@ -1880,7 +1880,7 @@ def update_polling_config(
"""
Update background polling configuration for a device.
Allows configuring the polling interval (10-3600 seconds) and
Allows configuring the polling interval (30-21600 seconds, i.e. 30s to 6 hours) and
enabling/disabling automatic background polling per device.
Changes take effect on the next polling cycle.
@@ -1891,10 +1891,15 @@ def update_polling_config(
# Update interval if provided
if payload.poll_interval_seconds is not None:
if payload.poll_interval_seconds < 10:
if payload.poll_interval_seconds < 30:
raise HTTPException(
status_code=400,
detail="Polling interval must be at least 10 seconds"
detail="Polling interval must be at least 30 seconds"
)
if payload.poll_interval_seconds > 21600:
raise HTTPException(
status_code=400,
detail="Polling interval must be at most 21600 seconds (6 hours)"
)
cfg.poll_interval_seconds = payload.poll_interval_seconds

View File

@@ -14,7 +14,7 @@ import zipfile
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, List
from typing import Optional, List, Dict
from sqlalchemy.orm import Session
from ftplib import FTP
from pathlib import Path
@@ -105,6 +105,19 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
_last_command_time = {}
_rate_limit_lock = asyncio.Lock()
# Per-device connection locks: NL43 devices only support one TCP connection at a time
# This prevents concurrent connections from fighting for the device
_device_locks: Dict[str, asyncio.Lock] = {}
_device_locks_lock = asyncio.Lock()
async def _get_device_lock(device_key: str) -> asyncio.Lock:
"""Get or create a lock for a specific device."""
async with _device_locks_lock:
if device_key not in _device_locks:
_device_locks[device_key] = asyncio.Lock()
return _device_locks[device_key]
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21):
@@ -133,7 +146,17 @@ class NL43Client:
NL43 protocol returns two lines for query commands:
Line 1: Result code (R+0000 for success, error codes otherwise)
Line 2: Actual data (for query commands ending with '?')
This method acquires a per-device lock to ensure only one TCP connection
is active at a time (NL43 devices only support single connections).
"""
# Acquire per-device lock to prevent concurrent connections
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
return await self._send_command_unlocked(cmd)
async def _send_command_unlocked(self, cmd: str) -> str:
"""Internal: send command without acquiring device lock (lock must be held by caller)."""
await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
@@ -429,105 +452,112 @@ class NL43Client:
The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream.
NOTE: This method holds the device lock for the entire duration of streaming,
blocking other commands to this device. This is intentional since NL43 devices
only support one TCP connection at a time.
"""
await self._enforce_rate_limit()
# Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
logger.info(f"DRD stream ended for {self.device_key}")
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
logger.info(f"DRD stream ended for {self.device_key}")
async def set_measurement_time(self, preset: str):
"""Set measurement time preset.