""" Device Status Monitor Background task that monitors device reachability via SLMM polling status and triggers alerts when devices go offline or come back online. This service bridges SLMM's device polling with Terra-View's alert system. """ import asyncio import logging from datetime import datetime from typing import Optional, Dict from backend.database import SessionLocal from backend.services.slmm_client import get_slmm_client, SLMMClientError from backend.services.alert_service import get_alert_service logger = logging.getLogger(__name__) class DeviceStatusMonitor: """ Monitors device reachability via SLMM's polling status endpoint. Detects state transitions (online→offline, offline→online) and triggers AlertService to create/resolve alerts. Usage: monitor = DeviceStatusMonitor() await monitor.start() # Start background monitoring monitor.stop() # Stop monitoring """ def __init__(self, check_interval: int = 60): """ Initialize the monitor. Args: check_interval: Seconds between status checks (default: 60) """ self.check_interval = check_interval self.running = False self.task: Optional[asyncio.Task] = None self.slmm_client = get_slmm_client() # Track previous device states to detect transitions self._device_states: Dict[str, bool] = {} async def start(self): """Start the monitoring background task.""" if self.running: logger.warning("DeviceStatusMonitor is already running") return self.running = True self.task = asyncio.create_task(self._monitor_loop()) logger.info(f"DeviceStatusMonitor started (checking every {self.check_interval}s)") def stop(self): """Stop the monitoring background task.""" self.running = False if self.task: self.task.cancel() logger.info("DeviceStatusMonitor stopped") async def _monitor_loop(self): """Main monitoring loop.""" while self.running: try: await self._check_all_devices() except Exception as e: logger.error(f"Error in device status monitor: {e}", exc_info=True) # Sleep in small intervals for graceful shutdown for _ in range(self.check_interval): if not self.running: break await asyncio.sleep(1) logger.info("DeviceStatusMonitor loop exited") async def _check_all_devices(self): """ Fetch polling status from SLMM and detect state transitions. Uses GET /api/slmm/_polling/status (proxied to SLMM) """ try: # Get status from SLMM status_response = await self.slmm_client.get_polling_status() devices = status_response.get("devices", []) if not devices: logger.debug("No devices in polling status response") return db = SessionLocal() try: alert_service = get_alert_service(db) for device in devices: unit_id = device.get("unit_id") if not unit_id: continue is_reachable = device.get("is_reachable", True) previous_reachable = self._device_states.get(unit_id) # Skip if this is the first check (no previous state) if previous_reachable is None: self._device_states[unit_id] = is_reachable logger.debug(f"Initial state for {unit_id}: reachable={is_reachable}") continue # Detect offline transition (was online, now offline) if previous_reachable and not is_reachable: logger.warning(f"Device {unit_id} went OFFLINE") alert_service.create_device_offline_alert( unit_id=unit_id, consecutive_failures=device.get("consecutive_failures", 0), last_error=device.get("last_error"), ) # Detect online transition (was offline, now online) elif not previous_reachable and is_reachable: logger.info(f"Device {unit_id} came back ONLINE") alert_service.resolve_device_offline_alert(unit_id) # Update tracked state self._device_states[unit_id] = is_reachable # Cleanup expired alerts while we're here alert_service.cleanup_expired_alerts() finally: db.close() except SLMMClientError as e: logger.warning(f"Could not reach SLMM for status check: {e}") except Exception as e: logger.error(f"Error checking device status: {e}", exc_info=True) def get_tracked_devices(self) -> Dict[str, bool]: """ Get the current tracked device states. Returns: Dict mapping unit_id to is_reachable status """ return dict(self._device_states) def clear_tracked_devices(self): """Clear all tracked device states (useful for testing).""" self._device_states.clear() # Singleton instance _monitor_instance: Optional[DeviceStatusMonitor] = None def get_device_status_monitor() -> DeviceStatusMonitor: """ Get the device status monitor singleton instance. Returns: DeviceStatusMonitor instance """ global _monitor_instance if _monitor_instance is None: _monitor_instance = DeviceStatusMonitor() return _monitor_instance async def start_device_status_monitor(): """Start the global device status monitor.""" monitor = get_device_status_monitor() await monitor.start() def stop_device_status_monitor(): """Stop the global device status monitor.""" monitor = get_device_status_monitor() monitor.stop()