Files
terra-view/backend/routers/slmm.py
T
serversdown c56b7f6c99 feat(slm): wire unit live view to the /monitor fan-out feed
The SLM live view now consumes SLMM's shared DOD /monitor feed instead of
the per-client DRD /stream. This fixes the single-connection contention
(many viewers share one device feed) and finally puts L1/L10 in the live
chart (DRD couldn't carry percentiles).

- New WS proxy handler /api/slmm/{unit}/monitor -> SLMM /api/nl43/{unit}/monitor.
  Uses asyncio.wait(FIRST_COMPLETED) + cancel-sibling instead of gather(), so
  it doesn't leave a task sending into a closed socket ("Unexpected ASGI
  message after close").
- Live view JS points at /monitor; onmessage reflects feed_status and ignores
  heartbeat / unreachable frames so they don't blank the cards or zero-spike
  the chart. Adds a small Live/Device-offline badge.

Still on the old /live (DRD): the dashboard live tile (sound_level_meters.html)
— next slice.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 18:13:17 +00:00

367 lines
13 KiB
Python

"""
SLMM (Sound Level Meter Manager) Proxy Router
Proxies requests from SFM to the standalone SLMM backend service.
SLMM runs on port 8100 and handles NL43/NL53 sound level meter communication.
"""
from fastapi import APIRouter, HTTPException, Request, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
import httpx
import websockets
import asyncio
import logging
import os
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/slmm", tags=["slmm"])
# SLMM backend URL - configurable via environment variable
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
# WebSocket URL derived from HTTP URL
SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://")
@router.get("/health")
async def check_slmm_health():
"""
Check if the SLMM backend service is reachable and healthy.
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{SLMM_BASE_URL}/health")
if response.status_code == 200:
data = response.json()
return {
"status": "ok",
"slmm_status": "connected",
"slmm_url": SLMM_BASE_URL,
"slmm_version": data.get("version", "unknown"),
"slmm_response": data
}
else:
return {
"status": "degraded",
"slmm_status": "error",
"slmm_url": SLMM_BASE_URL,
"detail": f"SLMM returned status {response.status_code}"
}
except httpx.ConnectError:
return {
"status": "error",
"slmm_status": "unreachable",
"slmm_url": SLMM_BASE_URL,
"detail": "Cannot connect to SLMM backend. Is it running?"
}
except Exception as e:
return {
"status": "error",
"slmm_status": "error",
"slmm_url": SLMM_BASE_URL,
"detail": str(e)
}
# WebSocket routes MUST come before the catch-all route
@router.websocket("/{unit_id}/stream")
async def proxy_websocket_stream(websocket: WebSocket, unit_id: str):
"""
Proxy WebSocket connections to SLMM's /stream endpoint.
This allows real-time streaming of measurement data from NL43 devices
through the SFM unified interface.
"""
await websocket.accept()
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id}")
# Build target WebSocket URL
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
backend_ws = None
try:
# Connect to SLMM backend WebSocket
backend_ws = await websockets.connect(target_ws_url)
logger.info(f"Connected to SLMM backend WebSocket for {unit_id}")
# Create tasks for bidirectional communication
async def forward_to_backend():
"""Forward messages from client to SLMM backend"""
try:
while True:
data = await websocket.receive_text()
await backend_ws.send(data)
except WebSocketDisconnect:
logger.info(f"Client WebSocket disconnected for {unit_id}")
except Exception as e:
logger.error(f"Error forwarding to backend: {e}")
async def forward_to_client():
"""Forward messages from SLMM backend to client"""
try:
async for message in backend_ws:
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Backend WebSocket closed for {unit_id}")
except Exception as e:
logger.error(f"Error forwarding to client: {e}")
# Run both forwarding tasks concurrently
await asyncio.gather(
forward_to_backend(),
forward_to_client(),
return_exceptions=True
)
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error connecting to SLMM backend: {e}")
try:
await websocket.send_json({
"error": "Failed to connect to SLMM backend",
"detail": str(e)
})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in WebSocket proxy for {unit_id}: {e}")
try:
await websocket.send_json({
"error": "Internal server error",
"detail": str(e)
})
except Exception:
pass
finally:
# Clean up connections
if backend_ws:
try:
await backend_ws.close()
except Exception:
pass
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket proxy closed for {unit_id}")
@router.websocket("/{unit_id}/live")
async def proxy_websocket_live(websocket: WebSocket, unit_id: str):
"""
Proxy WebSocket connections to SLMM's /live endpoint.
Alternative WebSocket endpoint that may be used by some frontend components.
"""
await websocket.accept()
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id} (live endpoint)")
# Build target WebSocket URL - try /stream endpoint as SLMM uses that for WebSocket
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
backend_ws = None
try:
# Connect to SLMM backend WebSocket
backend_ws = await websockets.connect(target_ws_url)
logger.info(f"Connected to SLMM backend WebSocket for {unit_id} (live endpoint)")
# Create tasks for bidirectional communication
async def forward_to_backend():
"""Forward messages from client to SLMM backend"""
try:
while True:
data = await websocket.receive_text()
await backend_ws.send(data)
except WebSocketDisconnect:
logger.info(f"Client WebSocket disconnected for {unit_id} (live)")
except Exception as e:
logger.error(f"Error forwarding to backend (live): {e}")
async def forward_to_client():
"""Forward messages from SLMM backend to client"""
try:
async for message in backend_ws:
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Backend WebSocket closed for {unit_id} (live)")
except Exception as e:
logger.error(f"Error forwarding to client (live): {e}")
# Run both forwarding tasks concurrently
await asyncio.gather(
forward_to_backend(),
forward_to_client(),
return_exceptions=True
)
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error connecting to SLMM backend (live): {e}")
try:
await websocket.send_json({
"error": "Failed to connect to SLMM backend",
"detail": str(e)
})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in WebSocket proxy for {unit_id} (live): {e}")
try:
await websocket.send_json({
"error": "Internal server error",
"detail": str(e)
})
except Exception:
pass
finally:
# Clean up connections
if backend_ws:
try:
await backend_ws.close()
except Exception:
pass
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
@router.websocket("/{unit_id}/monitor")
async def proxy_websocket_monitor(websocket: WebSocket, unit_id: str):
"""
Proxy WebSocket connections to SLMM's /monitor (fan-out DOD feed).
This is the shared ~1Hz DOD feed: many clients subscribe to one device feed
(no single-connection contention) and it carries L1/L10 (which the DRD
/stream cannot). Preferred over /stream for the live view.
"""
await websocket.accept()
logger.info(f"WebSocket accepted for SLMM unit {unit_id} (monitor)")
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor"
backend_ws = None
try:
backend_ws = await websockets.connect(target_ws_url)
logger.info(f"Connected to SLMM monitor feed for {unit_id}")
async def forward_to_client():
"""Backend monitor frames -> browser."""
async for message in backend_ws:
await websocket.send_text(message)
async def watch_client():
"""Drain client frames; raises WebSocketDisconnect on close so we can
tear the pair down (the monitor feed is server->client only)."""
while True:
await websocket.receive_text()
# When EITHER side ends (browser disconnects or backend closes), cancel the
# other immediately — avoids sending into a closed socket (the
# "Unexpected ASGI message after close" race that asyncio.gather leaves open).
tasks = [asyncio.ensure_future(forward_to_client()),
asyncio.ensure_future(watch_client())]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
for t in pending:
try:
await t
except Exception:
pass
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error connecting to SLMM monitor for {unit_id}: {e}")
try:
await websocket.send_json({"error": "Failed to connect to SLMM monitor", "detail": str(e)})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in monitor proxy for {unit_id}: {e}")
finally:
if backend_ws:
try:
await backend_ws.close()
except Exception:
pass
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket monitor proxy closed for {unit_id}")
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_slmm(path: str, request: Request):
"""
Proxy all requests to the SLMM backend service.
This allows SFM to act as a unified frontend for all device types,
while SLMM remains a standalone backend service.
"""
# Build target URL
target_url = f"{SLMM_BASE_URL}/api/nl43/{path}"
# Get query parameters
query_params = dict(request.query_params)
# Get request body if present
body = None
if request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.body()
except Exception as e:
logger.error(f"Failed to read request body: {e}")
body = None
# Get headers (exclude host and other proxy-specific headers)
headers = dict(request.headers)
headers_to_exclude = ["host", "content-length", "transfer-encoding", "connection"]
proxy_headers = {k: v for k, v in headers.items() if k.lower() not in headers_to_exclude}
logger.info(f"Proxying {request.method} request to SLMM: {target_url}")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# Forward the request to SLMM
response = await client.request(
method=request.method,
url=target_url,
params=query_params,
headers=proxy_headers,
content=body
)
# Return the response from SLMM
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.headers.get("content-type")
)
except httpx.ConnectError:
logger.error(f"Failed to connect to SLMM backend at {SLMM_BASE_URL}")
raise HTTPException(
status_code=503,
detail=f"SLMM backend service unavailable. Is SLMM running on {SLMM_BASE_URL}?"
)
except httpx.TimeoutException:
logger.error(f"Timeout connecting to SLMM backend at {SLMM_BASE_URL}")
raise HTTPException(
status_code=504,
detail="SLMM backend timeout"
)
except Exception as e:
logger.error(f"Error proxying to SLMM: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to proxy request to SLMM: {str(e)}"
)