Files
terra-view/backend/services/slmm_client.py
2026-01-12 18:07:26 +00:00

424 lines
12 KiB
Python

"""
SLMM API Client Wrapper
Provides a clean interface for Terra-View to interact with the SLMM backend.
All SLM operations should go through this client instead of direct HTTP calls.
SLMM (Sound Level Meter Manager) is a separate service running on port 8100
that handles TCP/FTP communication with Rion NL-43/NL-53 devices.
"""
import httpx
from typing import Optional, Dict, Any, List
from datetime import datetime
import json
# SLMM backend base URLs
SLMM_BASE_URL = "http://localhost:8100"
SLMM_API_BASE = f"{SLMM_BASE_URL}/api/nl43"
class SLMMClientError(Exception):
"""Base exception for SLMM client errors."""
pass
class SLMMConnectionError(SLMMClientError):
"""Raised when cannot connect to SLMM backend."""
pass
class SLMMDeviceError(SLMMClientError):
"""Raised when device operation fails."""
pass
class SLMMClient:
"""
Client for interacting with SLMM backend.
Usage:
client = SLMMClient()
units = await client.get_all_units()
status = await client.get_unit_status("nl43-001")
await client.start_recording("nl43-001", config={...})
"""
def __init__(self, base_url: str = SLMM_BASE_URL, timeout: float = 30.0):
self.base_url = base_url
self.api_base = f"{base_url}/api/nl43"
self.timeout = timeout
async def _request(
self,
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None,
) -> Dict[str, Any]:
"""
Make an HTTP request to SLMM backend.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (e.g., "/units", "/{unit_id}/status")
data: JSON body for POST/PUT requests
params: Query parameters
Returns:
Response JSON as dict
Raises:
SLMMConnectionError: Cannot reach SLMM
SLMMDeviceError: Device operation failed
"""
url = f"{self.api_base}{endpoint}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.request(
method=method,
url=url,
json=data,
params=params,
)
response.raise_for_status()
# Handle empty responses
if not response.content:
return {}
return response.json()
except httpx.ConnectError as e:
raise SLMMConnectionError(
f"Cannot connect to SLMM backend at {self.base_url}. "
f"Is SLMM running? Error: {str(e)}"
)
except httpx.HTTPStatusError as e:
error_detail = "Unknown error"
try:
error_data = e.response.json()
error_detail = error_data.get("detail", str(error_data))
except:
error_detail = e.response.text or str(e)
raise SLMMDeviceError(
f"SLMM operation failed: {error_detail}"
)
except Exception as e:
raise SLMMClientError(f"Unexpected error: {str(e)}")
# ========================================================================
# Unit Management
# ========================================================================
async def get_all_units(self) -> List[Dict[str, Any]]:
"""
Get all configured SLM units from SLMM.
Returns:
List of unit dicts with id, config, and status
"""
# SLMM doesn't have a /units endpoint yet, so we'll need to add this
# For now, return empty list or implement when SLMM endpoint is ready
try:
response = await self._request("GET", "/units")
return response.get("units", [])
except SLMMClientError:
# Endpoint may not exist yet
return []
async def get_unit_config(self, unit_id: str) -> Dict[str, Any]:
"""
Get unit configuration from SLMM cache.
Args:
unit_id: Unit identifier (e.g., "nl43-001")
Returns:
Config dict with host, tcp_port, ftp_port, etc.
"""
return await self._request("GET", f"/{unit_id}/config")
async def update_unit_config(
self,
unit_id: str,
host: Optional[str] = None,
tcp_port: Optional[int] = None,
ftp_port: Optional[int] = None,
ftp_username: Optional[str] = None,
ftp_password: Optional[str] = None,
) -> Dict[str, Any]:
"""
Update unit configuration in SLMM cache.
Args:
unit_id: Unit identifier
host: Device IP address
tcp_port: TCP control port (default: 2255)
ftp_port: FTP data port (default: 21)
ftp_username: FTP username
ftp_password: FTP password
Returns:
Updated config
"""
config = {}
if host is not None:
config["host"] = host
if tcp_port is not None:
config["tcp_port"] = tcp_port
if ftp_port is not None:
config["ftp_port"] = ftp_port
if ftp_username is not None:
config["ftp_username"] = ftp_username
if ftp_password is not None:
config["ftp_password"] = ftp_password
return await self._request("PUT", f"/{unit_id}/config", data=config)
# ========================================================================
# Status & Monitoring
# ========================================================================
async def get_unit_status(self, unit_id: str) -> Dict[str, Any]:
"""
Get cached status snapshot from SLMM.
Args:
unit_id: Unit identifier
Returns:
Status dict with measurement_state, lp, leq, battery, etc.
"""
return await self._request("GET", f"/{unit_id}/status")
async def get_live_data(self, unit_id: str) -> Dict[str, Any]:
"""
Request fresh data from device (DOD command).
Args:
unit_id: Unit identifier
Returns:
Live data snapshot
"""
return await self._request("GET", f"/{unit_id}/live")
# ========================================================================
# Recording Control
# ========================================================================
async def start_recording(
self,
unit_id: str,
config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Start recording on a unit.
Args:
unit_id: Unit identifier
config: Optional recording config (interval, settings, etc.)
Returns:
Response from SLMM with success status
"""
return await self._request("POST", f"/{unit_id}/start", data=config or {})
async def stop_recording(self, unit_id: str) -> Dict[str, Any]:
"""
Stop recording on a unit.
Args:
unit_id: Unit identifier
Returns:
Response from SLMM
"""
return await self._request("POST", f"/{unit_id}/stop")
async def pause_recording(self, unit_id: str) -> Dict[str, Any]:
"""
Pause recording on a unit.
Args:
unit_id: Unit identifier
Returns:
Response from SLMM
"""
return await self._request("POST", f"/{unit_id}/pause")
async def resume_recording(self, unit_id: str) -> Dict[str, Any]:
"""
Resume paused recording on a unit.
Args:
unit_id: Unit identifier
Returns:
Response from SLMM
"""
return await self._request("POST", f"/{unit_id}/resume")
async def reset_data(self, unit_id: str) -> Dict[str, Any]:
"""
Reset measurement data on a unit.
Args:
unit_id: Unit identifier
Returns:
Response from SLMM
"""
return await self._request("POST", f"/{unit_id}/reset")
# ========================================================================
# Device Settings
# ========================================================================
async def get_frequency_weighting(self, unit_id: str) -> Dict[str, Any]:
"""
Get frequency weighting setting (A, C, or Z).
Args:
unit_id: Unit identifier
Returns:
Dict with current weighting
"""
return await self._request("GET", f"/{unit_id}/frequency-weighting")
async def set_frequency_weighting(
self,
unit_id: str,
weighting: str,
) -> Dict[str, Any]:
"""
Set frequency weighting (A, C, or Z).
Args:
unit_id: Unit identifier
weighting: "A", "C", or "Z"
Returns:
Confirmation response
"""
return await self._request(
"PUT",
f"/{unit_id}/frequency-weighting",
data={"weighting": weighting},
)
async def get_time_weighting(self, unit_id: str) -> Dict[str, Any]:
"""
Get time weighting setting (F, S, or I).
Args:
unit_id: Unit identifier
Returns:
Dict with current time weighting
"""
return await self._request("GET", f"/{unit_id}/time-weighting")
async def set_time_weighting(
self,
unit_id: str,
weighting: str,
) -> Dict[str, Any]:
"""
Set time weighting (F=Fast, S=Slow, I=Impulse).
Args:
unit_id: Unit identifier
weighting: "F", "S", or "I"
Returns:
Confirmation response
"""
return await self._request(
"PUT",
f"/{unit_id}/time-weighting",
data={"weighting": weighting},
)
async def get_all_settings(self, unit_id: str) -> Dict[str, Any]:
"""
Get all device settings.
Args:
unit_id: Unit identifier
Returns:
Dict with all settings
"""
return await self._request("GET", f"/{unit_id}/settings")
# ========================================================================
# Data Download (Future)
# ========================================================================
async def download_files(
self,
unit_id: str,
destination_path: str,
files: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Download files from unit via FTP.
NOTE: This endpoint doesn't exist in SLMM yet. Will need to implement.
Args:
unit_id: Unit identifier
destination_path: Local path to save files
files: List of filenames to download, or None for all
Returns:
Dict with downloaded files list and metadata
"""
data = {
"destination_path": destination_path,
"files": files or "all",
}
return await self._request("POST", f"/{unit_id}/ftp/download", data=data)
# ========================================================================
# Health Check
# ========================================================================
async def health_check(self) -> bool:
"""
Check if SLMM backend is reachable.
Returns:
True if SLMM is responding, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{self.base_url}/health")
return response.status_code == 200
except:
return False
# Singleton instance for convenience
_default_client: Optional[SLMMClient] = None
def get_slmm_client() -> SLMMClient:
"""
Get the default SLMM client instance.
Returns:
SLMMClient instance
"""
global _default_client
if _default_client is None:
_default_client = SLMMClient()
return _default_client