From 89fca9d0fe04b70552dc9ccbf07c5de6fa9c696c Mon Sep 17 00:00:00 2001 From: serversdwn Date: Thu, 29 Jan 2026 06:08:55 +0000 Subject: [PATCH] Background poller intervals increased. --- app/background_poller.py | 13 +-- app/routers.py | 21 ++-- app/services.py | 216 ++++++++++++++++++++++----------------- 3 files changed, 143 insertions(+), 107 deletions(-) diff --git a/app/background_poller.py b/app/background_poller.py index 4c65940..503abb5 100644 --- a/app/background_poller.py +++ b/app/background_poller.py @@ -25,7 +25,7 @@ class BackgroundPoller: Background task that continuously polls NL43 devices and updates status cache. Features: - - Per-device configurable poll intervals (10-3600 seconds) + - Per-device configurable poll intervals (30 seconds to 6 hours) - Automatic offline detection (marks unreachable after 3 consecutive failures) - Dynamic sleep intervals based on device configurations - Graceful shutdown on application stop @@ -230,8 +230,8 @@ class BackgroundPoller: Calculate the next sleep interval based on all device poll intervals. Returns a dynamic sleep time that ensures responsive polling: - - Minimum 10 seconds (prevents tight loops) - - Maximum 30 seconds (ensures responsiveness) + - Minimum 30 seconds (prevents tight loops) + - Maximum 300 seconds / 5 minutes (ensures reasonable responsiveness for long intervals) - Generally half the minimum device interval Returns: @@ -245,14 +245,15 @@ class BackgroundPoller: ).all() if not configs: - return 30 # Default sleep when no devices configured + return 60 # Default sleep when no devices configured # Get all intervals intervals = [cfg.poll_interval_seconds or 60 for cfg in configs] min_interval = min(intervals) - # Use half the minimum interval, but cap between 10-30 seconds - sleep_time = max(10, min(30, min_interval // 2)) + # Use half the minimum interval, but cap between 30-300 seconds + # This allows longer sleep times when polling intervals are long (e.g., hourly) + sleep_time = max(30, min(300, min_interval // 2)) return sleep_time diff --git a/app/routers.py b/app/routers.py index b5c4e5d..a2e5e92 100644 --- a/app/routers.py +++ b/app/routers.py @@ -81,14 +81,14 @@ class ConfigPayload(BaseModel): @field_validator("poll_interval_seconds") @classmethod def validate_poll_interval(cls, v): - if v is not None and not (10 <= v <= 3600): - raise ValueError("Poll interval must be between 10 and 3600 seconds") + if v is not None and not (30 <= v <= 21600): + raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)") return v class PollingConfigPayload(BaseModel): """Payload for updating device polling configuration.""" - poll_interval_seconds: int | None = Field(None, ge=10, le=3600, description="Polling interval in seconds (10-3600)") + poll_interval_seconds: int | None = Field(None, ge=30, le=21600, description="Polling interval in seconds (30s to 6 hours)") poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device") @@ -233,8 +233,8 @@ class RosterCreatePayload(BaseModel): @field_validator("poll_interval_seconds") @classmethod def validate_poll_interval(cls, v): - if v is not None and not (10 <= v <= 3600): - raise ValueError("Poll interval must be between 10 and 3600 seconds") + if v is not None and not (30 <= v <= 21600): + raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)") return v @@ -1880,7 +1880,7 @@ def update_polling_config( """ Update background polling configuration for a device. - Allows configuring the polling interval (10-3600 seconds) and + Allows configuring the polling interval (30-21600 seconds, i.e. 30s to 6 hours) and enabling/disabling automatic background polling per device. Changes take effect on the next polling cycle. @@ -1891,10 +1891,15 @@ def update_polling_config( # Update interval if provided if payload.poll_interval_seconds is not None: - if payload.poll_interval_seconds < 10: + if payload.poll_interval_seconds < 30: raise HTTPException( status_code=400, - detail="Polling interval must be at least 10 seconds" + detail="Polling interval must be at least 30 seconds" + ) + if payload.poll_interval_seconds > 21600: + raise HTTPException( + status_code=400, + detail="Polling interval must be at most 21600 seconds (6 hours)" ) cfg.poll_interval_seconds = payload.poll_interval_seconds diff --git a/app/services.py b/app/services.py index c1cb6b4..219e41d 100644 --- a/app/services.py +++ b/app/services.py @@ -14,7 +14,7 @@ import zipfile import tempfile from dataclasses import dataclass from datetime import datetime, timezone, timedelta -from typing import Optional, List +from typing import Optional, List, Dict from sqlalchemy.orm import Session from ftplib import FTP from pathlib import Path @@ -105,6 +105,19 @@ def persist_snapshot(s: NL43Snapshot, db: Session): _last_command_time = {} _rate_limit_lock = asyncio.Lock() +# Per-device connection locks: NL43 devices only support one TCP connection at a time +# This prevents concurrent connections from fighting for the device +_device_locks: Dict[str, asyncio.Lock] = {} +_device_locks_lock = asyncio.Lock() + + +async def _get_device_lock(device_key: str) -> asyncio.Lock: + """Get or create a lock for a specific device.""" + async with _device_locks_lock: + if device_key not in _device_locks: + _device_locks[device_key] = asyncio.Lock() + return _device_locks[device_key] + class NL43Client: def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21): @@ -133,7 +146,17 @@ class NL43Client: NL43 protocol returns two lines for query commands: Line 1: Result code (R+0000 for success, error codes otherwise) Line 2: Actual data (for query commands ending with '?') + + This method acquires a per-device lock to ensure only one TCP connection + is active at a time (NL43 devices only support single connections). """ + # Acquire per-device lock to prevent concurrent connections + device_lock = await _get_device_lock(self.device_key) + async with device_lock: + return await self._send_command_unlocked(cmd) + + async def _send_command_unlocked(self, cmd: str) -> str: + """Internal: send command without acquiring device lock (lock must be held by caller).""" await self._enforce_rate_limit() logger.info(f"Sending command to {self.device_key}: {cmd.strip()}") @@ -429,105 +452,112 @@ class NL43Client: The stream continues until an exception occurs or the connection is closed. Send SUB character (0x1A) to stop the stream. + + NOTE: This method holds the device lock for the entire duration of streaming, + blocking other commands to this device. This is intentional since NL43 devices + only support one TCP connection at a time. """ - await self._enforce_rate_limit() + # Acquire per-device lock - held for entire streaming session + device_lock = await _get_device_lock(self.device_key) + async with device_lock: + await self._enforce_rate_limit() - logger.info(f"Starting DRD stream for {self.device_key}") + logger.info(f"Starting DRD stream for {self.device_key}") - try: - reader, writer = await asyncio.wait_for( - asyncio.open_connection(self.host, self.port), timeout=self.timeout - ) - except asyncio.TimeoutError: - logger.error(f"DRD stream connection timeout to {self.device_key}") - raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}") - except Exception as e: - logger.error(f"DRD stream connection failed to {self.device_key}: {e}") - raise ConnectionError(f"Failed to connect to device: {str(e)}") - - try: - # Start DRD streaming - writer.write(b"DRD?\r\n") - await writer.drain() - - # Read initial result code - first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) - result_code = first_line_data.decode(errors="ignore").strip() - - if result_code.startswith("$"): - result_code = result_code[1:].strip() - - logger.debug(f"DRD stream result code from {self.device_key}: {result_code}") - - if result_code != "R+0000": - raise ValueError(f"DRD stream failed to start: {result_code}") - - logger.info(f"DRD stream started successfully for {self.device_key}") - - # Continuously read data lines - while True: - try: - line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0) - line = line_data.decode(errors="ignore").strip() - - if not line: - continue - - # Remove leading $ if present - if line.startswith("$"): - line = line[1:].strip() - - # Parse the DRD data (same format as DOD) - parts = [p.strip() for p in line.split(",") if p.strip() != ""] - - if len(parts) < 2: - logger.warning(f"Malformed DRD data from {self.device_key}: {line}") - continue - - snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure") - - # Parse known positions (DRD format - same as DOD) - # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... - try: - # Capture d0 (counter) for timer synchronization - if len(parts) >= 1: - snap.counter = parts[0] # d0: Measurement interval counter (1-600) - if len(parts) >= 2: - snap.lp = parts[1] # d1: Instantaneous sound pressure level - if len(parts) >= 3: - snap.leq = parts[2] # d2: Equivalent continuous sound level - if len(parts) >= 4: - snap.lmax = parts[3] # d3: Maximum level - if len(parts) >= 5: - snap.lmin = parts[4] # d4: Minimum level - if len(parts) >= 6: - snap.lpeak = parts[5] # d5: Peak level - except (IndexError, ValueError) as e: - logger.warning(f"Error parsing DRD data points: {e}") - - # Call the callback with the snapshot - await callback(snap) - - except asyncio.TimeoutError: - logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}") - break - except asyncio.IncompleteReadError: - logger.info(f"DRD stream closed by device {self.device_key}") - break - - finally: - # Send SUB character to stop streaming try: - writer.write(b"\x1A") + reader, writer = await asyncio.wait_for( + asyncio.open_connection(self.host, self.port), timeout=self.timeout + ) + except asyncio.TimeoutError: + logger.error(f"DRD stream connection timeout to {self.device_key}") + raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}") + except Exception as e: + logger.error(f"DRD stream connection failed to {self.device_key}: {e}") + raise ConnectionError(f"Failed to connect to device: {str(e)}") + + try: + # Start DRD streaming + writer.write(b"DRD?\r\n") await writer.drain() - except Exception: - pass - writer.close() - with contextlib.suppress(Exception): - await writer.wait_closed() + # Read initial result code + first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout) + result_code = first_line_data.decode(errors="ignore").strip() - logger.info(f"DRD stream ended for {self.device_key}") + if result_code.startswith("$"): + result_code = result_code[1:].strip() + + logger.debug(f"DRD stream result code from {self.device_key}: {result_code}") + + if result_code != "R+0000": + raise ValueError(f"DRD stream failed to start: {result_code}") + + logger.info(f"DRD stream started successfully for {self.device_key}") + + # Continuously read data lines + while True: + try: + line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0) + line = line_data.decode(errors="ignore").strip() + + if not line: + continue + + # Remove leading $ if present + if line.startswith("$"): + line = line[1:].strip() + + # Parse the DRD data (same format as DOD) + parts = [p.strip() for p in line.split(",") if p.strip() != ""] + + if len(parts) < 2: + logger.warning(f"Malformed DRD data from {self.device_key}: {line}") + continue + + snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure") + + # Parse known positions (DRD format - same as DOD) + # DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ... + try: + # Capture d0 (counter) for timer synchronization + if len(parts) >= 1: + snap.counter = parts[0] # d0: Measurement interval counter (1-600) + if len(parts) >= 2: + snap.lp = parts[1] # d1: Instantaneous sound pressure level + if len(parts) >= 3: + snap.leq = parts[2] # d2: Equivalent continuous sound level + if len(parts) >= 4: + snap.lmax = parts[3] # d3: Maximum level + if len(parts) >= 5: + snap.lmin = parts[4] # d4: Minimum level + if len(parts) >= 6: + snap.lpeak = parts[5] # d5: Peak level + except (IndexError, ValueError) as e: + logger.warning(f"Error parsing DRD data points: {e}") + + # Call the callback with the snapshot + await callback(snap) + + except asyncio.TimeoutError: + logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}") + break + except asyncio.IncompleteReadError: + logger.info(f"DRD stream closed by device {self.device_key}") + break + + finally: + # Send SUB character to stop streaming + try: + writer.write(b"\x1A") + await writer.drain() + except Exception: + pass + + writer.close() + with contextlib.suppress(Exception): + await writer.wait_closed() + + logger.info(f"DRD stream ended for {self.device_key}") async def set_measurement_time(self, preset: str): """Set measurement time preset.