Update to v 0.4.0 #6
@@ -8,6 +8,7 @@ for fast API access without querying devices on every request.
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -20,6 +21,11 @@ from app.device_logger import log_device_event, cleanup_old_logs
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Global polling default. Set SLMM_POLLING_ENABLED=false to start an instance in
|
||||||
|
# standby (running but not polling and not holding device connections) — e.g. a
|
||||||
|
# dev box that must not latch onto a device that a prod instance owns.
|
||||||
|
POLLING_ENABLED_DEFAULT = os.getenv("SLMM_POLLING_ENABLED", "true").lower() == "true"
|
||||||
|
|
||||||
|
|
||||||
class BackgroundPoller:
|
class BackgroundPoller:
|
||||||
"""
|
"""
|
||||||
@@ -39,6 +45,7 @@ class BackgroundPoller:
|
|||||||
self._logger = logger
|
self._logger = logger
|
||||||
self._last_cleanup = None # Track last log cleanup time
|
self._last_cleanup = None # Track last log cleanup time
|
||||||
self._last_pool_log = None # Track last connection pool heartbeat log
|
self._last_pool_log = None # Track last connection pool heartbeat log
|
||||||
|
self._active = POLLING_ENABLED_DEFAULT # Global polling on/off (standby toggle)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the background polling task."""
|
"""Start the background polling task."""
|
||||||
@@ -71,15 +78,48 @@ class BackgroundPoller:
|
|||||||
|
|
||||||
self._logger.info("Background poller stopped")
|
self._logger.info("Background poller stopped")
|
||||||
|
|
||||||
|
def is_active(self) -> bool:
|
||||||
|
"""Whether background polling is currently active (vs standby)."""
|
||||||
|
return self._active
|
||||||
|
|
||||||
|
async def set_active(self, active: bool):
|
||||||
|
"""Globally enable/disable polling at runtime.
|
||||||
|
|
||||||
|
When deactivated, the loop stays alive but polls nothing and releases all
|
||||||
|
device connections, so this SLMM instance stops occupying the devices'
|
||||||
|
single connection slots (e.g. so a prod instance can take over). Runtime
|
||||||
|
state only — on restart the instance returns to SLMM_POLLING_ENABLED.
|
||||||
|
"""
|
||||||
|
self._active = active
|
||||||
|
if active:
|
||||||
|
self._logger.info("[SYSTEM] Background polling ACTIVATED")
|
||||||
|
else:
|
||||||
|
self._logger.info("[SYSTEM] Background polling DEACTIVATED (standby) — releasing connections")
|
||||||
|
await self._release_all_connections()
|
||||||
|
|
||||||
|
async def _release_all_connections(self):
|
||||||
|
"""Gracefully close every pooled device connection (no-op if none)."""
|
||||||
|
from app.services import _connection_pool
|
||||||
|
for device_key in list(_connection_pool.get_stats().get("connections", {})):
|
||||||
|
await _connection_pool.discard(device_key)
|
||||||
|
|
||||||
async def _poll_loop(self):
|
async def _poll_loop(self):
|
||||||
"""Main polling loop that runs continuously."""
|
"""Main polling loop that runs continuously."""
|
||||||
self._logger.info("Background polling loop started")
|
self._logger.info("Background polling loop started")
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
|
if self._active:
|
||||||
try:
|
try:
|
||||||
await self._poll_all_devices()
|
await self._poll_all_devices()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
|
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
|
||||||
|
else:
|
||||||
|
# Standby: poll nothing, and keep holding no device connection slots
|
||||||
|
# so another SLMM instance (e.g. prod) can talk to the devices.
|
||||||
|
try:
|
||||||
|
await self._release_all_connections()
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.warning(f"Standby connection release failed: {e}")
|
||||||
|
|
||||||
# Run log cleanup once per hour
|
# Run log cleanup once per hour
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -153,6 +153,99 @@ async def disconnect_device(unit_id: str, db: Session = Depends(get_db)):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{unit_id}/deactivate")
|
||||||
|
async def deactivate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||||
|
"""Make a single unit dormant: stop background polling for it AND drop its
|
||||||
|
connection, freeing the device's connection slot. poll_enabled=False is
|
||||||
|
persisted, so the unit stays dormant across restarts until /activate.
|
||||||
|
"""
|
||||||
|
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||||
|
if not cfg:
|
||||||
|
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||||
|
|
||||||
|
cfg.poll_enabled = False
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
from app.services import _connection_pool, _get_device_lock
|
||||||
|
|
||||||
|
device_key = f"{cfg.host}:{cfg.tcp_port}"
|
||||||
|
|
||||||
|
# Wait briefly for any in-flight poll/command to finish (so its connection is
|
||||||
|
# back in the pool), then drop it. If a long-lived stream holds the lock we
|
||||||
|
# don't block forever — discard the pooled connection regardless.
|
||||||
|
lock = await _get_device_lock(device_key)
|
||||||
|
acquired = False
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(lock.acquire(), timeout=10.0)
|
||||||
|
acquired = True
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
acquired = False
|
||||||
|
try:
|
||||||
|
await _connection_pool.discard(device_key)
|
||||||
|
finally:
|
||||||
|
if acquired:
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"unit_id": unit_id,
|
||||||
|
"poll_enabled": False,
|
||||||
|
"message": "Polling disabled and connection closed for this unit",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{unit_id}/activate")
|
||||||
|
async def activate_device(unit_id: str, db: Session = Depends(get_db)):
|
||||||
|
"""Resume background polling for a unit previously deactivated."""
|
||||||
|
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
|
||||||
|
if not cfg:
|
||||||
|
raise HTTPException(status_code=404, detail="NL43 config not found")
|
||||||
|
|
||||||
|
cfg.poll_enabled = True
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"unit_id": unit_id,
|
||||||
|
"poll_enabled": True,
|
||||||
|
"message": "Polling enabled for this unit",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/_system/status")
|
||||||
|
async def system_status():
|
||||||
|
"""Report whether this SLMM instance is actively polling or in standby."""
|
||||||
|
from app.background_poller import poller
|
||||||
|
from app.services import _connection_pool
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"mode": "active" if poller.is_active() else "standby",
|
||||||
|
"polling_active": poller.is_active(),
|
||||||
|
"active_connections": _connection_pool.get_stats().get("active_connections", 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/_system/standby")
|
||||||
|
async def system_standby():
|
||||||
|
"""Put this SLMM instance into standby: stop polling ALL devices and release
|
||||||
|
every connection, so it stops occupying device slots (e.g. so a prod instance
|
||||||
|
can take over). Runtime-only — on restart the instance returns to its
|
||||||
|
SLMM_POLLING_ENABLED default.
|
||||||
|
"""
|
||||||
|
from app.background_poller import poller
|
||||||
|
await poller.set_active(False)
|
||||||
|
return {"status": "ok", "mode": "standby",
|
||||||
|
"message": "Polling stopped and all device connections released"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/_system/resume")
|
||||||
|
async def system_resume():
|
||||||
|
"""Resume polling after standby (global)."""
|
||||||
|
from app.background_poller import poller
|
||||||
|
await poller.set_active(True)
|
||||||
|
return {"status": "ok", "mode": "active", "message": "Polling resumed"}
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user