""" Device Controller Service Routes device operations to the appropriate backend module: - SLMM for sound level meters - SFM for seismographs (future implementation) This abstraction allows Projects system to work with any device type without knowing the underlying communication protocol. """ from typing import Dict, Any, Optional, List from backend.services.slmm_client import get_slmm_client, SLMMClientError class DeviceControllerError(Exception): """Base exception for device controller errors.""" pass class UnsupportedDeviceTypeError(DeviceControllerError): """Raised when device type is not supported.""" pass class DeviceController: """ Unified interface for controlling all device types. Routes commands to appropriate backend module based on device_type. Usage: controller = DeviceController() await controller.start_recording("nl43-001", "slm", config={}) await controller.stop_recording("seismo-042", "seismograph") """ def __init__(self): self.slmm_client = get_slmm_client() # ======================================================================== # Recording Control # ======================================================================== async def start_recording( self, unit_id: str, device_type: str, config: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Start recording on a device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" config: Device-specific recording configuration Returns: Response dict from device module Raises: UnsupportedDeviceTypeError: Device type not supported DeviceControllerError: Operation failed """ if device_type == "slm": try: return await self.slmm_client.start_recording(unit_id, config) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": # TODO: Implement SFM client for seismograph control # For now, return a placeholder response return { "status": "not_implemented", "message": "Seismograph recording control not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError( f"Device type '{device_type}' is not supported. " f"Supported types: slm, seismograph" ) async def stop_recording( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Stop recording on a device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict from device module """ if device_type == "slm": try: return await self.slmm_client.stop_recording(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": # TODO: Implement SFM client return { "status": "not_implemented", "message": "Seismograph recording control not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def pause_recording( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Pause recording on a device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict from device module """ if device_type == "slm": try: return await self.slmm_client.pause_recording(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph pause not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def resume_recording( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Resume paused recording on a device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict from device module """ if device_type == "slm": try: return await self.slmm_client.resume_recording(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph resume not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Status & Monitoring # ======================================================================== async def get_device_status( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Get current device status. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Status dict from device module """ if device_type == "slm": try: return await self.slmm_client.get_unit_status(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": # TODO: Implement SFM status check return { "status": "not_implemented", "message": "Seismograph status not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def get_live_data( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Get live data from device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Live data dict from device module """ if device_type == "slm": try: return await self.slmm_client.get_live_data(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph live data not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Data Download # ======================================================================== async def download_files( self, unit_id: str, device_type: str, destination_path: str, files: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Download data files from device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" destination_path: Local path to save files files: List of filenames, or None for all Returns: Download result with file list """ if device_type == "slm": try: return await self.slmm_client.download_files( unit_id, destination_path, files, ) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": # TODO: Implement SFM file download return { "status": "not_implemented", "message": "Seismograph file download not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # FTP Control # ======================================================================== async def enable_ftp( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Enable FTP server on device. Must be called before downloading files. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict with status """ if device_type == "slm": try: return await self.slmm_client.enable_ftp(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph FTP not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def disable_ftp( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Disable FTP server on device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict with status """ if device_type == "slm": try: return await self.slmm_client.disable_ftp(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph FTP not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Device Configuration # ======================================================================== async def update_device_config( self, unit_id: str, device_type: str, config: Dict[str, Any], ) -> Dict[str, Any]: """ Update device configuration. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" config: Configuration parameters Returns: Updated config from device module """ if device_type == "slm": try: return await self.slmm_client.update_unit_config( unit_id, host=config.get("host"), tcp_port=config.get("tcp_port"), ftp_port=config.get("ftp_port"), ftp_username=config.get("ftp_username"), ftp_password=config.get("ftp_password"), ) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph config update not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Store/Index Management # ======================================================================== async def increment_index( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Increment the store/index number on a device. For SLMs, this increments the store name to prevent "overwrite data?" prompts. Should be called before starting a new measurement if auto_increment_index is enabled. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict with old_index and new_index """ if device_type == "slm": try: return await self.slmm_client.increment_index(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": # Seismographs may not have the same concept of store index return { "status": "not_applicable", "message": "Index increment not applicable for seismographs", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def get_index_number( self, unit_id: str, device_type: str, ) -> Dict[str, Any]: """ Get current store/index number from device. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: Response dict with current index_number """ if device_type == "slm": try: return await self.slmm_client.get_index_number(unit_id) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_applicable", "message": "Index number not applicable for seismographs", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Cycle Commands (for scheduled automation) # ======================================================================== async def start_cycle( self, unit_id: str, device_type: str, sync_clock: bool = True, ) -> Dict[str, Any]: """ Execute complete start cycle for scheduled automation. This handles the full pre-recording workflow: 1. Sync device clock to server time 2. Find next safe index (with overwrite protection) 3. Start measurement Args: unit_id: Unit identifier device_type: "slm" | "seismograph" sync_clock: Whether to sync device clock to server time Returns: Response dict from device module """ if device_type == "slm": try: return await self.slmm_client.start_cycle(unit_id, sync_clock) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph start cycle not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") async def stop_cycle( self, unit_id: str, device_type: str, download: bool = True, ) -> Dict[str, Any]: """ Execute complete stop cycle for scheduled automation. This handles the full post-recording workflow: 1. Stop measurement 2. Enable FTP 3. Download measurement folder 4. Verify download Args: unit_id: Unit identifier device_type: "slm" | "seismograph" download: Whether to download measurement data Returns: Response dict from device module """ if device_type == "slm": try: return await self.slmm_client.stop_cycle(unit_id, download) except SLMMClientError as e: raise DeviceControllerError(f"SLMM error: {str(e)}") elif device_type == "seismograph": return { "status": "not_implemented", "message": "Seismograph stop cycle not yet implemented", "unit_id": unit_id, } else: raise UnsupportedDeviceTypeError(f"Unsupported device type: {device_type}") # ======================================================================== # Health Check # ======================================================================== async def check_device_connectivity( self, unit_id: str, device_type: str, ) -> bool: """ Check if device is reachable. Args: unit_id: Unit identifier device_type: "slm" | "seismograph" Returns: True if device is reachable, False otherwise """ if device_type == "slm": try: status = await self.slmm_client.get_unit_status(unit_id) return status.get("last_seen") is not None except: return False elif device_type == "seismograph": # TODO: Implement SFM connectivity check return False else: return False # Singleton instance _default_controller: Optional[DeviceController] = None def get_device_controller() -> DeviceController: """ Get the default device controller instance. Returns: DeviceController instance """ global _default_controller if _default_controller is None: _default_controller = DeviceController() return _default_controller