Persistent polling interval increased. Healthcheck now uses poll instead of separate handshakes.
This commit is contained in:
34
app/main.py
34
app/main.py
@@ -92,10 +92,14 @@ async def health():
|
||||
|
||||
@app.get("/health/devices")
|
||||
async def health_devices():
|
||||
"""Enhanced health check that tests device connectivity."""
|
||||
"""Enhanced health check that tests device connectivity.
|
||||
|
||||
Uses the connection pool to avoid unnecessary TCP handshakes — if a
|
||||
cached connection exists and is alive, the device is reachable.
|
||||
"""
|
||||
from sqlalchemy.orm import Session
|
||||
from app.database import SessionLocal
|
||||
from app.services import NL43Client
|
||||
from app.services import _connection_pool
|
||||
from app.models import NL43Config
|
||||
|
||||
db: Session = SessionLocal()
|
||||
@@ -105,7 +109,7 @@ async def health_devices():
|
||||
configs = db.query(NL43Config).filter_by(tcp_enabled=True).all()
|
||||
|
||||
for cfg in configs:
|
||||
client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
status = {
|
||||
"unit_id": cfg.unit_id,
|
||||
"host": cfg.host,
|
||||
@@ -115,14 +119,22 @@ async def health_devices():
|
||||
}
|
||||
|
||||
try:
|
||||
# Try to connect (don't send command to avoid rate limiting issues)
|
||||
import asyncio
|
||||
reader, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=2.0
|
||||
)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
status["reachable"] = True
|
||||
# Check if pool already has a live connection (zero-cost check)
|
||||
pool_stats = _connection_pool.get_stats()
|
||||
conn_info = pool_stats["connections"].get(device_key)
|
||||
if conn_info and conn_info["alive"]:
|
||||
status["reachable"] = True
|
||||
status["source"] = "pool"
|
||||
else:
|
||||
# No cached connection — do a lightweight acquire/release
|
||||
# This opens a connection if needed but keeps it in the pool
|
||||
import asyncio
|
||||
reader, writer, from_cache = await _connection_pool.acquire(
|
||||
device_key, cfg.host, cfg.tcp_port, timeout=2.0
|
||||
)
|
||||
await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
|
||||
status["reachable"] = True
|
||||
status["source"] = "cached" if from_cache else "new"
|
||||
except Exception as e:
|
||||
status["error"] = str(type(e).__name__)
|
||||
logger.warning(f"Device {cfg.unit_id} health check failed: {e}")
|
||||
|
||||
@@ -1755,74 +1755,38 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
|
||||
"message": "TCP communication enabled"
|
||||
}
|
||||
|
||||
# Test 3: Modem/Router reachable (check port 443 HTTPS)
|
||||
# Test 3: TCP connection reachable (device port) — uses connection pool
|
||||
# This avoids extra TCP handshakes over cellular. If a cached connection
|
||||
# exists and is alive, we skip the handshake entirely.
|
||||
from app.services import _connection_pool
|
||||
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||
try:
|
||||
reader, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(cfg.host, 443), timeout=3.0
|
||||
)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
diagnostics["tests"]["modem_reachable"] = {
|
||||
"status": "pass",
|
||||
"message": f"Modem/router reachable at {cfg.host}"
|
||||
}
|
||||
except asyncio.TimeoutError:
|
||||
diagnostics["tests"]["modem_reachable"] = {
|
||||
"status": "fail",
|
||||
"message": f"Modem/router timeout at {cfg.host} (network issue)"
|
||||
}
|
||||
diagnostics["overall_status"] = "fail"
|
||||
return diagnostics
|
||||
except ConnectionRefusedError:
|
||||
# Connection refused means host is up but port 443 closed - that's ok
|
||||
diagnostics["tests"]["modem_reachable"] = {
|
||||
"status": "pass",
|
||||
"message": f"Modem/router reachable at {cfg.host} (HTTPS closed)"
|
||||
}
|
||||
except Exception as e:
|
||||
diagnostics["tests"]["modem_reachable"] = {
|
||||
"status": "fail",
|
||||
"message": f"Cannot reach modem/router at {cfg.host}: {str(e)}"
|
||||
}
|
||||
diagnostics["overall_status"] = "fail"
|
||||
return diagnostics
|
||||
|
||||
# Test 4: TCP connection reachable (device port)
|
||||
try:
|
||||
reader, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=3.0
|
||||
)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "pass",
|
||||
"message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}"
|
||||
}
|
||||
except asyncio.TimeoutError:
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "fail",
|
||||
"message": f"Connection timeout to {cfg.host}:{cfg.tcp_port}"
|
||||
}
|
||||
diagnostics["overall_status"] = "fail"
|
||||
return diagnostics
|
||||
except ConnectionRefusedError:
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "fail",
|
||||
"message": f"Connection refused by {cfg.host}:{cfg.tcp_port}"
|
||||
}
|
||||
diagnostics["overall_status"] = "fail"
|
||||
return diagnostics
|
||||
pool_stats = _connection_pool.get_stats()
|
||||
conn_info = pool_stats["connections"].get(device_key)
|
||||
if conn_info and conn_info["alive"]:
|
||||
# Pool already has a live connection — device is reachable
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "pass",
|
||||
"message": f"TCP connection alive in pool for {cfg.host}:{cfg.tcp_port}"
|
||||
}
|
||||
else:
|
||||
# Acquire through the pool (opens new if needed, keeps it cached)
|
||||
reader, writer, from_cache = await _connection_pool.acquire(
|
||||
device_key, cfg.host, cfg.tcp_port, timeout=3.0
|
||||
)
|
||||
await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "pass",
|
||||
"message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}"
|
||||
}
|
||||
except Exception as e:
|
||||
diagnostics["tests"]["tcp_connection"] = {
|
||||
"status": "fail",
|
||||
"message": f"Connection error: {str(e)}"
|
||||
"message": f"Connection error to {cfg.host}:{cfg.tcp_port}: {str(e)}"
|
||||
}
|
||||
diagnostics["overall_status"] = "fail"
|
||||
return diagnostics
|
||||
|
||||
# Wait a bit after connection test to let device settle
|
||||
await asyncio.sleep(1.5)
|
||||
|
||||
# Test 5: Device responds to commands
|
||||
# Use longer timeout to account for rate limiting (device requires ≥1s between commands)
|
||||
client = NL43Client(cfg.host, cfg.tcp_port, timeout=10.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
|
||||
|
||||
@@ -242,8 +242,8 @@ async def _get_device_lock(device_key: str) -> asyncio.Lock:
|
||||
|
||||
# Configuration via environment variables
|
||||
TCP_PERSISTENT_ENABLED = os.getenv("TCP_PERSISTENT_ENABLED", "true").lower() == "true"
|
||||
TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "120")) # Close idle connections after N seconds
|
||||
TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "300")) # Force reconnect after N seconds
|
||||
TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "300")) # Close idle connections after N seconds
|
||||
TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "1800")) # Force reconnect after N seconds
|
||||
TCP_KEEPALIVE_IDLE = int(os.getenv("TCP_KEEPALIVE_IDLE", "15")) # Seconds idle before probes
|
||||
TCP_KEEPALIVE_INTERVAL = int(os.getenv("TCP_KEEPALIVE_INTERVAL", "10")) # Seconds between probes
|
||||
TCP_KEEPALIVE_COUNT = int(os.getenv("TCP_KEEPALIVE_COUNT", "3")) # Failed probes before dead
|
||||
|
||||
Reference in New Issue
Block a user