feat(slm): wire unit live view to the /monitor fan-out feed
The SLM live view now consumes SLMM's shared DOD /monitor feed instead of
the per-client DRD /stream. This fixes the single-connection contention
(many viewers share one device feed) and finally puts L1/L10 in the live
chart (DRD couldn't carry percentiles).
- New WS proxy handler /api/slmm/{unit}/monitor -> SLMM /api/nl43/{unit}/monitor.
Uses asyncio.wait(FIRST_COMPLETED) + cancel-sibling instead of gather(), so
it doesn't leave a task sending into a closed socket ("Unexpected ASGI
message after close").
- Live view JS points at /monitor; onmessage reflects feed_status and ignores
heartbeat / unreachable frames so they don't blank the cards or zero-spike
the chart. Adds a small Live/Device-offline badge.
Still on the old /live (DRD): the dashboard live tile (sound_level_meters.html)
— next slice.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -231,6 +231,71 @@ async def proxy_websocket_live(websocket: WebSocket, unit_id: str):
|
|||||||
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
|
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
|
||||||
|
|
||||||
|
|
||||||
|
@router.websocket("/{unit_id}/monitor")
|
||||||
|
async def proxy_websocket_monitor(websocket: WebSocket, unit_id: str):
|
||||||
|
"""
|
||||||
|
Proxy WebSocket connections to SLMM's /monitor (fan-out DOD feed).
|
||||||
|
|
||||||
|
This is the shared ~1Hz DOD feed: many clients subscribe to one device feed
|
||||||
|
(no single-connection contention) and it carries L1/L10 (which the DRD
|
||||||
|
/stream cannot). Preferred over /stream for the live view.
|
||||||
|
"""
|
||||||
|
await websocket.accept()
|
||||||
|
logger.info(f"WebSocket accepted for SLMM unit {unit_id} (monitor)")
|
||||||
|
|
||||||
|
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor"
|
||||||
|
backend_ws = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
backend_ws = await websockets.connect(target_ws_url)
|
||||||
|
logger.info(f"Connected to SLMM monitor feed for {unit_id}")
|
||||||
|
|
||||||
|
async def forward_to_client():
|
||||||
|
"""Backend monitor frames -> browser."""
|
||||||
|
async for message in backend_ws:
|
||||||
|
await websocket.send_text(message)
|
||||||
|
|
||||||
|
async def watch_client():
|
||||||
|
"""Drain client frames; raises WebSocketDisconnect on close so we can
|
||||||
|
tear the pair down (the monitor feed is server->client only)."""
|
||||||
|
while True:
|
||||||
|
await websocket.receive_text()
|
||||||
|
|
||||||
|
# When EITHER side ends (browser disconnects or backend closes), cancel the
|
||||||
|
# other immediately — avoids sending into a closed socket (the
|
||||||
|
# "Unexpected ASGI message after close" race that asyncio.gather leaves open).
|
||||||
|
tasks = [asyncio.ensure_future(forward_to_client()),
|
||||||
|
asyncio.ensure_future(watch_client())]
|
||||||
|
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
for t in pending:
|
||||||
|
t.cancel()
|
||||||
|
for t in pending:
|
||||||
|
try:
|
||||||
|
await t
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except websockets.exceptions.WebSocketException as e:
|
||||||
|
logger.error(f"WebSocket error connecting to SLMM monitor for {unit_id}: {e}")
|
||||||
|
try:
|
||||||
|
await websocket.send_json({"error": "Failed to connect to SLMM monitor", "detail": str(e)})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error in monitor proxy for {unit_id}: {e}")
|
||||||
|
finally:
|
||||||
|
if backend_ws:
|
||||||
|
try:
|
||||||
|
await backend_ws.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await websocket.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
logger.info(f"WebSocket monitor proxy closed for {unit_id}")
|
||||||
|
|
||||||
|
|
||||||
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
|
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
|
||||||
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
||||||
async def proxy_to_slmm(path: str, request: Request):
|
async def proxy_to_slmm(path: str, request: Request):
|
||||||
|
|||||||
@@ -143,6 +143,8 @@
|
|||||||
</svg>
|
</svg>
|
||||||
Stop Live Stream
|
Stop Live Stream
|
||||||
</button>
|
</button>
|
||||||
|
|
||||||
|
<span id="live-feed-status" class="ml-3 self-center" style="display: none;"></span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
@@ -512,9 +514,11 @@ function initLiveDataStream(unitId) {
|
|||||||
window.liveChart.update();
|
window.liveChart.update();
|
||||||
}
|
}
|
||||||
|
|
||||||
// WebSocket URL for SLMM backend via proxy
|
// WebSocket URL for SLMM backend via proxy.
|
||||||
|
// /monitor = the shared fan-out DOD feed (many viewers, one device connection,
|
||||||
|
// and it carries L1/L10 which the DRD /stream cannot).
|
||||||
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||||
const wsUrl = `${wsProtocol}//${window.location.host}/api/slmm/${unitId}/live`;
|
const wsUrl = `${wsProtocol}//${window.location.host}/api/slmm/${unitId}/monitor`;
|
||||||
|
|
||||||
window.currentWebSocket = new WebSocket(wsUrl);
|
window.currentWebSocket = new WebSocket(wsUrl);
|
||||||
|
|
||||||
@@ -530,7 +534,11 @@ function initLiveDataStream(unitId) {
|
|||||||
window.currentWebSocket.onmessage = function(event) {
|
window.currentWebSocket.onmessage = function(event) {
|
||||||
try {
|
try {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
console.log('WebSocket data received:', data);
|
// The DOD monitor sends keepalive 'heartbeat' frames (no metrics) and a
|
||||||
|
// 'feed_status' on each frame. Reflect status, but don't let a heartbeat
|
||||||
|
// or an 'unreachable' frame blank the cards / spike the chart with zeros.
|
||||||
|
updateFeedStatus(data.feed_status);
|
||||||
|
if (data.heartbeat || data.feed_status === 'unreachable') return;
|
||||||
updateLiveMetrics(data);
|
updateLiveMetrics(data);
|
||||||
updateLiveChart(data);
|
updateLiveChart(data);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -559,6 +567,21 @@ function stopLiveDataStream() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reflect device reachability from the monitor feed's feed_status. Safe no-op
|
||||||
|
// if the badge element isn't on the page.
|
||||||
|
function updateFeedStatus(status) {
|
||||||
|
const el = document.getElementById('live-feed-status');
|
||||||
|
if (!el || status == null) return;
|
||||||
|
if (status === 'unreachable') {
|
||||||
|
el.textContent = 'Device offline';
|
||||||
|
el.className = 'text-xs font-medium px-2 py-0.5 rounded bg-red-100 text-red-700 dark:bg-red-900/40 dark:text-red-300';
|
||||||
|
} else {
|
||||||
|
el.textContent = 'Live';
|
||||||
|
el.className = 'text-xs font-medium px-2 py-0.5 rounded bg-green-100 text-green-700 dark:bg-green-900/40 dark:text-green-300';
|
||||||
|
}
|
||||||
|
el.style.display = '';
|
||||||
|
}
|
||||||
|
|
||||||
// Update metrics display
|
// Update metrics display
|
||||||
function updateLiveMetrics(data) {
|
function updateLiveMetrics(data) {
|
||||||
if (document.getElementById('live-lp')) {
|
if (document.getElementById('live-lp')) {
|
||||||
|
|||||||
Reference in New Issue
Block a user