- pair_devices.html template for device pairing interface - SLMM device control lock prevents flooding nl43. Fix: - Polling intervals for SLMM. - modem view now list - device pairing much improved. - various other tweaks through out UI. - SLMM Scheduled downloads fixed.
782 lines
24 KiB
Python
782 lines
24 KiB
Python
"""
|
|
SLMM API Client Wrapper
|
|
|
|
Provides a clean interface for Terra-View to interact with the SLMM backend.
|
|
All SLM operations should go through this client instead of direct HTTP calls.
|
|
|
|
SLMM (Sound Level Meter Manager) is a separate service running on port 8100
|
|
that handles TCP/FTP communication with Rion NL-43/NL-53 devices.
|
|
"""
|
|
|
|
import httpx
|
|
import os
|
|
from typing import Optional, Dict, Any, List
|
|
from datetime import datetime
|
|
import json
|
|
|
|
|
|
# SLMM backend base URLs - use environment variable if set (for Docker)
|
|
SLMM_BASE_URL = os.environ.get("SLMM_BASE_URL", "http://localhost:8100")
|
|
SLMM_API_BASE = f"{SLMM_BASE_URL}/api/nl43"
|
|
|
|
|
|
class SLMMClientError(Exception):
|
|
"""Base exception for SLMM client errors."""
|
|
pass
|
|
|
|
|
|
class SLMMConnectionError(SLMMClientError):
|
|
"""Raised when cannot connect to SLMM backend."""
|
|
pass
|
|
|
|
|
|
class SLMMDeviceError(SLMMClientError):
|
|
"""Raised when device operation fails."""
|
|
pass
|
|
|
|
|
|
class SLMMClient:
|
|
"""
|
|
Client for interacting with SLMM backend.
|
|
|
|
Usage:
|
|
client = SLMMClient()
|
|
units = await client.get_all_units()
|
|
status = await client.get_unit_status("nl43-001")
|
|
await client.start_recording("nl43-001", config={...})
|
|
"""
|
|
|
|
def __init__(self, base_url: str = SLMM_BASE_URL, timeout: float = 30.0):
|
|
self.base_url = base_url
|
|
self.api_base = f"{base_url}/api/nl43"
|
|
self.timeout = timeout
|
|
|
|
async def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
data: Optional[Dict] = None,
|
|
params: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Make an HTTP request to SLMM backend.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT, DELETE)
|
|
endpoint: API endpoint (e.g., "/units", "/{unit_id}/status")
|
|
data: JSON body for POST/PUT requests
|
|
params: Query parameters
|
|
|
|
Returns:
|
|
Response JSON as dict
|
|
|
|
Raises:
|
|
SLMMConnectionError: Cannot reach SLMM
|
|
SLMMDeviceError: Device operation failed
|
|
"""
|
|
url = f"{self.api_base}{endpoint}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.request(
|
|
method=method,
|
|
url=url,
|
|
json=data,
|
|
params=params,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Handle empty responses
|
|
if not response.content:
|
|
return {}
|
|
|
|
return response.json()
|
|
|
|
except httpx.ConnectError as e:
|
|
raise SLMMConnectionError(
|
|
f"Cannot connect to SLMM backend at {self.base_url}. "
|
|
f"Is SLMM running? Error: {str(e)}"
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
error_detail = "Unknown error"
|
|
try:
|
|
error_data = e.response.json()
|
|
error_detail = error_data.get("detail", str(error_data))
|
|
except:
|
|
error_detail = e.response.text or str(e)
|
|
|
|
raise SLMMDeviceError(
|
|
f"SLMM operation failed: {error_detail}"
|
|
)
|
|
except Exception as e:
|
|
raise SLMMClientError(f"Unexpected error: {str(e)}")
|
|
|
|
# ========================================================================
|
|
# Unit Management
|
|
# ========================================================================
|
|
|
|
async def get_all_units(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all configured SLM units from SLMM.
|
|
|
|
Returns:
|
|
List of unit dicts with id, config, and status
|
|
"""
|
|
# SLMM doesn't have a /units endpoint yet, so we'll need to add this
|
|
# For now, return empty list or implement when SLMM endpoint is ready
|
|
try:
|
|
response = await self._request("GET", "/units")
|
|
return response.get("units", [])
|
|
except SLMMClientError:
|
|
# Endpoint may not exist yet
|
|
return []
|
|
|
|
async def get_unit_config(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get unit configuration from SLMM cache.
|
|
|
|
Args:
|
|
unit_id: Unit identifier (e.g., "nl43-001")
|
|
|
|
Returns:
|
|
Config dict with host, tcp_port, ftp_port, etc.
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/config")
|
|
|
|
async def update_unit_config(
|
|
self,
|
|
unit_id: str,
|
|
host: Optional[str] = None,
|
|
tcp_port: Optional[int] = None,
|
|
ftp_port: Optional[int] = None,
|
|
ftp_username: Optional[str] = None,
|
|
ftp_password: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update unit configuration in SLMM cache.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
host: Device IP address
|
|
tcp_port: TCP control port (default: 2255)
|
|
ftp_port: FTP data port (default: 21)
|
|
ftp_username: FTP username
|
|
ftp_password: FTP password
|
|
|
|
Returns:
|
|
Updated config
|
|
"""
|
|
config = {}
|
|
if host is not None:
|
|
config["host"] = host
|
|
if tcp_port is not None:
|
|
config["tcp_port"] = tcp_port
|
|
if ftp_port is not None:
|
|
config["ftp_port"] = ftp_port
|
|
if ftp_username is not None:
|
|
config["ftp_username"] = ftp_username
|
|
if ftp_password is not None:
|
|
config["ftp_password"] = ftp_password
|
|
|
|
return await self._request("PUT", f"/{unit_id}/config", data=config)
|
|
|
|
# ========================================================================
|
|
# Status & Monitoring
|
|
# ========================================================================
|
|
|
|
async def get_unit_status(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get cached status snapshot from SLMM.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Status dict with measurement_state, lp, leq, battery, etc.
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/status")
|
|
|
|
async def get_live_data(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Request fresh data from device (DOD command).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Live data snapshot
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/live")
|
|
|
|
# ========================================================================
|
|
# Recording Control
|
|
# ========================================================================
|
|
|
|
async def start_recording(
|
|
self,
|
|
unit_id: str,
|
|
config: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Start recording on a unit.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
config: Optional recording config (interval, settings, etc.)
|
|
|
|
Returns:
|
|
Response from SLMM with success status
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/start", data=config or {})
|
|
|
|
async def stop_recording(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Stop recording on a unit.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Response from SLMM
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/stop")
|
|
|
|
async def pause_recording(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Pause recording on a unit.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Response from SLMM
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/pause")
|
|
|
|
async def resume_recording(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Resume paused recording on a unit.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Response from SLMM
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/resume")
|
|
|
|
async def reset_data(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Reset measurement data on a unit.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Response from SLMM
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/reset")
|
|
|
|
# ========================================================================
|
|
# Store/Index Management
|
|
# ========================================================================
|
|
|
|
async def get_index_number(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get current store/index number from device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with current index_number (store name)
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/index-number")
|
|
|
|
async def set_index_number(
|
|
self,
|
|
unit_id: str,
|
|
index_number: int,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Set store/index number on device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
index_number: New index number to set
|
|
|
|
Returns:
|
|
Confirmation response
|
|
"""
|
|
return await self._request(
|
|
"PUT",
|
|
f"/{unit_id}/index-number",
|
|
data={"index_number": index_number},
|
|
)
|
|
|
|
async def check_overwrite_status(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Check if data exists at the current store index.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with:
|
|
- overwrite_status: "None" (safe) or "Exist" (would overwrite)
|
|
- will_overwrite: bool
|
|
- safe_to_store: bool
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/overwrite-check")
|
|
|
|
async def increment_index(self, unit_id: str, max_attempts: int = 100) -> Dict[str, Any]:
|
|
"""
|
|
Find and set the next available (unused) store/index number.
|
|
|
|
Checks the current index - if it would overwrite existing data,
|
|
increments until finding an unused index number.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
max_attempts: Maximum number of indices to try before giving up
|
|
|
|
Returns:
|
|
Dict with old_index, new_index, and attempts_made
|
|
"""
|
|
# Get current index
|
|
current = await self.get_index_number(unit_id)
|
|
old_index = current.get("index_number", 0)
|
|
|
|
# Check if current index is safe
|
|
overwrite_check = await self.check_overwrite_status(unit_id)
|
|
if overwrite_check.get("safe_to_store", False):
|
|
# Current index is safe, no need to increment
|
|
return {
|
|
"success": True,
|
|
"old_index": old_index,
|
|
"new_index": old_index,
|
|
"unit_id": unit_id,
|
|
"already_safe": True,
|
|
"attempts_made": 0,
|
|
}
|
|
|
|
# Need to find an unused index
|
|
attempts = 0
|
|
test_index = old_index + 1
|
|
|
|
while attempts < max_attempts:
|
|
# Set the new index
|
|
await self.set_index_number(unit_id, test_index)
|
|
|
|
# Check if this index is safe
|
|
overwrite_check = await self.check_overwrite_status(unit_id)
|
|
attempts += 1
|
|
|
|
if overwrite_check.get("safe_to_store", False):
|
|
return {
|
|
"success": True,
|
|
"old_index": old_index,
|
|
"new_index": test_index,
|
|
"unit_id": unit_id,
|
|
"already_safe": False,
|
|
"attempts_made": attempts,
|
|
}
|
|
|
|
# Try next index (wrap around at 9999)
|
|
test_index = (test_index + 1) % 10000
|
|
|
|
# Avoid infinite loops if we've wrapped around
|
|
if test_index == old_index:
|
|
break
|
|
|
|
# Could not find a safe index
|
|
raise SLMMDeviceError(
|
|
f"Could not find unused store index for {unit_id} after {attempts} attempts. "
|
|
f"Consider downloading and clearing data from the device."
|
|
)
|
|
|
|
# ========================================================================
|
|
# Device Settings
|
|
# ========================================================================
|
|
|
|
async def get_frequency_weighting(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get frequency weighting setting (A, C, or Z).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with current weighting
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/frequency-weighting")
|
|
|
|
async def set_frequency_weighting(
|
|
self,
|
|
unit_id: str,
|
|
weighting: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Set frequency weighting (A, C, or Z).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
weighting: "A", "C", or "Z"
|
|
|
|
Returns:
|
|
Confirmation response
|
|
"""
|
|
return await self._request(
|
|
"PUT",
|
|
f"/{unit_id}/frequency-weighting",
|
|
data={"weighting": weighting},
|
|
)
|
|
|
|
async def get_time_weighting(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get time weighting setting (F, S, or I).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with current time weighting
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/time-weighting")
|
|
|
|
async def set_time_weighting(
|
|
self,
|
|
unit_id: str,
|
|
weighting: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Set time weighting (F=Fast, S=Slow, I=Impulse).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
weighting: "F", "S", or "I"
|
|
|
|
Returns:
|
|
Confirmation response
|
|
"""
|
|
return await self._request(
|
|
"PUT",
|
|
f"/{unit_id}/time-weighting",
|
|
data={"weighting": weighting},
|
|
)
|
|
|
|
async def get_all_settings(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get all device settings.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with all settings
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/settings")
|
|
|
|
# ========================================================================
|
|
# FTP Control
|
|
# ========================================================================
|
|
|
|
async def enable_ftp(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Enable FTP server on device.
|
|
|
|
Must be called before downloading files. FTP and TCP can work in tandem.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with status message
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/ftp/enable")
|
|
|
|
async def disable_ftp(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Disable FTP server on device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with status message
|
|
"""
|
|
return await self._request("POST", f"/{unit_id}/ftp/disable")
|
|
|
|
async def get_ftp_status(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get FTP server status on device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with ftp_enabled status
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/ftp/status")
|
|
|
|
# ========================================================================
|
|
# Data Download
|
|
# ========================================================================
|
|
|
|
async def download_file(
|
|
self,
|
|
unit_id: str,
|
|
remote_path: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Download a single file from unit via FTP.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
remote_path: Path on device to download (e.g., "/NL43_DATA/measurement.wav")
|
|
|
|
Returns:
|
|
Binary file content (as response)
|
|
"""
|
|
data = {"remote_path": remote_path}
|
|
return await self._request("POST", f"/{unit_id}/ftp/download", data=data)
|
|
|
|
async def download_folder(
|
|
self,
|
|
unit_id: str,
|
|
remote_path: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Download an entire folder from unit via FTP as a ZIP archive.
|
|
|
|
Useful for downloading complete measurement sessions (e.g., Auto_0000 folders).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
remote_path: Folder path on device to download (e.g., "/NL43_DATA/Auto_0000")
|
|
|
|
Returns:
|
|
Dict with local_path, folder_name, file_count, zip_size_bytes
|
|
"""
|
|
data = {"remote_path": remote_path}
|
|
return await self._request("POST", f"/{unit_id}/ftp/download-folder", data=data)
|
|
|
|
async def download_current_measurement(
|
|
self,
|
|
unit_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Download the current measurement folder based on device's index number.
|
|
|
|
This is the recommended method for scheduled downloads - it automatically
|
|
determines which folder to download based on the device's current store index.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with local_path, folder_name, file_count, zip_size_bytes, index_number
|
|
"""
|
|
# Get current index number from device
|
|
index_info = await self.get_index_number(unit_id)
|
|
index_number = index_info.get("index_number", 0)
|
|
|
|
# Format as Auto_XXXX folder name
|
|
folder_name = f"Auto_{index_number:04d}"
|
|
remote_path = f"/NL43_DATA/{folder_name}"
|
|
|
|
# Download the folder
|
|
result = await self.download_folder(unit_id, remote_path)
|
|
result["index_number"] = index_number
|
|
return result
|
|
|
|
async def download_files(
|
|
self,
|
|
unit_id: str,
|
|
destination_path: str,
|
|
files: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Download measurement files from unit via FTP.
|
|
|
|
This method automatically determines the current measurement folder and downloads it.
|
|
The destination_path parameter is logged for reference but actual download location
|
|
is managed by SLMM (data/downloads/{unit_id}/).
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
destination_path: Reference path (for logging/metadata, not used by SLMM)
|
|
files: Ignored - always downloads the current measurement folder
|
|
|
|
Returns:
|
|
Dict with download result including local_path, folder_name, etc.
|
|
"""
|
|
# Use the new method that automatically determines what to download
|
|
result = await self.download_current_measurement(unit_id)
|
|
result["requested_destination"] = destination_path
|
|
return result
|
|
|
|
# ========================================================================
|
|
# Cycle Commands (for scheduled automation)
|
|
# ========================================================================
|
|
|
|
async def start_cycle(
|
|
self,
|
|
unit_id: str,
|
|
sync_clock: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute complete start cycle on device via SLMM.
|
|
|
|
This handles the full pre-recording workflow:
|
|
1. Sync device clock to server time
|
|
2. Find next safe index (with overwrite protection)
|
|
3. Start measurement
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
sync_clock: Whether to sync device clock to server time
|
|
|
|
Returns:
|
|
Dict with clock_synced, old_index, new_index, started, etc.
|
|
"""
|
|
return await self._request(
|
|
"POST",
|
|
f"/{unit_id}/start-cycle",
|
|
data={"sync_clock": sync_clock},
|
|
)
|
|
|
|
async def stop_cycle(
|
|
self,
|
|
unit_id: str,
|
|
download: bool = True,
|
|
download_path: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute complete stop cycle on device via SLMM.
|
|
|
|
This handles the full post-recording workflow:
|
|
1. Stop measurement
|
|
2. Enable FTP
|
|
3. Download measurement folder (if download=True)
|
|
4. Verify download
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
download: Whether to download measurement data
|
|
download_path: Custom path for downloaded ZIP (optional)
|
|
|
|
Returns:
|
|
Dict with stopped, ftp_enabled, download_success, local_path, etc.
|
|
"""
|
|
data = {"download": download}
|
|
if download_path:
|
|
data["download_path"] = download_path
|
|
return await self._request(
|
|
"POST",
|
|
f"/{unit_id}/stop-cycle",
|
|
data=data,
|
|
)
|
|
|
|
# ========================================================================
|
|
# Polling Status (for device monitoring/alerts)
|
|
# ========================================================================
|
|
|
|
async def get_polling_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Get global polling status from SLMM.
|
|
|
|
Returns device reachability information for all polled devices.
|
|
Used by DeviceStatusMonitor to detect offline/online transitions.
|
|
|
|
Returns:
|
|
Dict with devices list containing:
|
|
- unit_id
|
|
- is_reachable
|
|
- consecutive_failures
|
|
- last_poll_attempt
|
|
- last_success
|
|
- last_error
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(f"{self.base_url}/api/nl43/_polling/status")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.ConnectError:
|
|
raise SLMMConnectionError("Cannot connect to SLMM for polling status")
|
|
except Exception as e:
|
|
raise SLMMClientError(f"Failed to get polling status: {str(e)}")
|
|
|
|
async def get_device_polling_config(self, unit_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get polling configuration for a specific device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
|
|
Returns:
|
|
Dict with poll_enabled and poll_interval_seconds
|
|
"""
|
|
return await self._request("GET", f"/{unit_id}/polling/config")
|
|
|
|
async def update_device_polling_config(
|
|
self,
|
|
unit_id: str,
|
|
poll_enabled: Optional[bool] = None,
|
|
poll_interval_seconds: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update polling configuration for a device.
|
|
|
|
Args:
|
|
unit_id: Unit identifier
|
|
poll_enabled: Enable/disable polling
|
|
poll_interval_seconds: Polling interval (10-3600)
|
|
|
|
Returns:
|
|
Updated config
|
|
"""
|
|
config = {}
|
|
if poll_enabled is not None:
|
|
config["poll_enabled"] = poll_enabled
|
|
if poll_interval_seconds is not None:
|
|
config["poll_interval_seconds"] = poll_interval_seconds
|
|
|
|
return await self._request("PUT", f"/{unit_id}/polling/config", data=config)
|
|
|
|
# ========================================================================
|
|
# Health Check
|
|
# ========================================================================
|
|
|
|
async def health_check(self) -> bool:
|
|
"""
|
|
Check if SLMM backend is reachable.
|
|
|
|
Returns:
|
|
True if SLMM is responding, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
response = await client.get(f"{self.base_url}/health")
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
|
|
# Singleton instance for convenience
|
|
_default_client: Optional[SLMMClient] = None
|
|
|
|
|
|
def get_slmm_client() -> SLMMClient:
|
|
"""
|
|
Get the default SLMM client instance.
|
|
|
|
Returns:
|
|
SLMMClient instance
|
|
"""
|
|
global _default_client
|
|
if _default_client is None:
|
|
_default_client = SLMMClient()
|
|
return _default_client
|