Merge branch 'dev' into feat/ftp-report-pipeline
pulled in the live slm stuff
This commit is contained in:
@@ -91,29 +91,43 @@ async def get_slm_units(
|
||||
|
||||
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
|
||||
for unit in units:
|
||||
# Legacy default from the roster field; refined from SLMM's cached status below.
|
||||
unit.is_recent = bool(unit.slm_last_check and unit.slm_last_check > one_hour_ago)
|
||||
unit.measurement_state = None
|
||||
unit.cache_last_seen = None # SLMM cache last_seen (real monitoring freshness)
|
||||
|
||||
if include_measurement:
|
||||
async def fetch_measurement_state(client: httpx.AsyncClient, unit_id: str) -> str | None:
|
||||
try:
|
||||
response = await client.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/measurement-state")
|
||||
if response.status_code == 200:
|
||||
return response.json().get("measurement_state")
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
deployed_units = [unit for unit in units if unit.deployed and not unit.retired]
|
||||
if deployed_units:
|
||||
# SLMM's /roster carries each unit's CACHED status (last_seen,
|
||||
# measurement_state) from NL43Status — a DB read on SLMM's side, NOT a device
|
||||
# call. The live monitor refreshes that cache ~every 1.3s, so this reflects
|
||||
# real monitoring without sending Measure? to the device (which the old
|
||||
# /measurement-state did) and competing with DOD polling. One call covers all.
|
||||
slmm_status = {}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=3.0) as client:
|
||||
tasks = [fetch_measurement_state(client, unit.id) for unit in deployed_units]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
r = await client.get(f"{SLMM_BASE_URL}/api/nl43/roster")
|
||||
if r.status_code == 200:
|
||||
for dev in (r.json().get("devices") or []):
|
||||
slmm_status[dev.get("unit_id")] = dev.get("status") or {}
|
||||
except Exception:
|
||||
slmm_status = {}
|
||||
|
||||
for unit, state in zip(deployed_units, results):
|
||||
if isinstance(state, Exception):
|
||||
unit.measurement_state = None
|
||||
else:
|
||||
unit.measurement_state = state
|
||||
# "Recent" = the monitor has a fresh successful read. last_seen only advances
|
||||
# on a successful poll, so staleness == the device isn't being reached.
|
||||
recent_cutoff = datetime.utcnow() - timedelta(minutes=5)
|
||||
for unit in units:
|
||||
st = slmm_status.get(unit.id)
|
||||
if not st:
|
||||
continue
|
||||
unit.measurement_state = st.get("measurement_state")
|
||||
last_seen = st.get("last_seen")
|
||||
if last_seen:
|
||||
try:
|
||||
ls = datetime.fromisoformat(last_seen.replace("Z", ""))
|
||||
unit.is_recent = ls > recent_cutoff
|
||||
unit.cache_last_seen = ls # the real freshness the monitor updates
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return templates.TemplateResponse("partials/slm_device_list.html", {
|
||||
"request": request,
|
||||
@@ -157,25 +171,18 @@ async def get_live_view(request: Request, unit_id: str, db: Session = Depends(ge
|
||||
is_measuring = False
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
# Get measurement state
|
||||
state_response = await client.get(
|
||||
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/measurement-state"
|
||||
)
|
||||
if state_response.status_code == 200:
|
||||
state_data = state_response.json()
|
||||
measurement_state = state_data.get("measurement_state", "Unknown")
|
||||
is_measuring = state_data.get("is_measuring", False)
|
||||
|
||||
# Get live status (measurement_start_time is already stored in SLMM database)
|
||||
status_response = await client.get(
|
||||
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/live"
|
||||
)
|
||||
if status_response.status_code == 200:
|
||||
status_data = status_response.json()
|
||||
current_status = status_data.get("data", {})
|
||||
# Read SLMM's CACHED status (NL43Status) — no device call. The live monitor
|
||||
# keeps it fresh (~1.3s) and the live-stream WS provides ongoing updates, so we
|
||||
# no longer fire Measure? + a fresh DOD read at the device on every command-
|
||||
# center load (which competed with DOD polling for the single connection).
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
r = await client.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/status")
|
||||
if r.status_code == 200:
|
||||
current_status = r.json().get("data", {})
|
||||
measurement_state = current_status.get("measurement_state")
|
||||
is_measuring = measurement_state in ("Start", "Measure")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get status for {unit_id}: {e}")
|
||||
logger.error(f"Failed to get cached status for {unit_id}: {e}")
|
||||
|
||||
return templates.TemplateResponse("partials/slm_live_view.html", {
|
||||
"request": request,
|
||||
|
||||
@@ -231,6 +231,76 @@ async def proxy_websocket_live(websocket: WebSocket, unit_id: str):
|
||||
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
|
||||
|
||||
|
||||
@router.websocket("/{unit_id}/monitor")
|
||||
async def proxy_websocket_monitor(websocket: WebSocket, unit_id: str):
|
||||
"""
|
||||
Proxy WebSocket connections to SLMM's /monitor (fan-out DOD feed).
|
||||
|
||||
This is the shared ~1Hz DOD feed: many clients subscribe to one device feed
|
||||
(no single-connection contention) and it carries L1/L10 (which the DRD
|
||||
/stream cannot). Preferred over /stream for the live view.
|
||||
"""
|
||||
await websocket.accept()
|
||||
logger.info(f"WebSocket accepted for SLMM unit {unit_id} (monitor)")
|
||||
|
||||
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor"
|
||||
backend_ws = None
|
||||
|
||||
try:
|
||||
backend_ws = await websockets.connect(target_ws_url)
|
||||
logger.info(f"Connected to SLMM monitor feed for {unit_id}")
|
||||
|
||||
async def forward_to_client():
|
||||
"""Backend monitor frames -> browser."""
|
||||
async for message in backend_ws:
|
||||
await websocket.send_text(message)
|
||||
|
||||
async def watch_client():
|
||||
"""Drain client frames; raises WebSocketDisconnect on close so we can
|
||||
tear the pair down (the monitor feed is server->client only)."""
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
|
||||
# When EITHER side ends (browser disconnects or backend closes), cancel the
|
||||
# other immediately — avoids sending into a closed socket (the
|
||||
# "Unexpected ASGI message after close" race that asyncio.gather leaves open).
|
||||
tasks = [asyncio.ensure_future(forward_to_client()),
|
||||
asyncio.ensure_future(watch_client())]
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
# Await ALL tasks (the done one AND the cancelled one) and swallow both
|
||||
# the expected WebSocketDisconnect and CancelledError. CancelledError is a
|
||||
# BaseException, so a bare `except Exception` misses it — that's what leaked
|
||||
# the traceback on stop; and awaiting only `pending` left the done task's
|
||||
# exception unretrieved.
|
||||
for t in tasks:
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
except websockets.exceptions.WebSocketException as e:
|
||||
logger.error(f"WebSocket error connecting to SLMM monitor for {unit_id}: {e}")
|
||||
try:
|
||||
await websocket.send_json({"error": "Failed to connect to SLMM monitor", "detail": str(e)})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in monitor proxy for {unit_id}: {e}")
|
||||
finally:
|
||||
if backend_ws:
|
||||
try:
|
||||
await backend_ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await websocket.close()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"WebSocket monitor proxy closed for {unit_id}")
|
||||
|
||||
|
||||
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
|
||||
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
||||
async def proxy_to_slmm(path: str, request: Request):
|
||||
|
||||
Reference in New Issue
Block a user