SLM config now sync to SLMM, SLMM caches configs for speed
This commit is contained in:
140
backend/main.py
140
backend/main.py
@@ -1,13 +1,22 @@
|
||||
import os
|
||||
import logging
|
||||
from fastapi import FastAPI, Request, Depends
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List, Dict
|
||||
from pydantic import BaseModel
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from backend.database import engine, Base, get_db
|
||||
from backend.routers import roster, units, photos, roster_edit, dashboard, dashboard_tabs, activity, slmm, slm_ui, slm_dashboard, seismo_dashboard
|
||||
from backend.services.snapshot import emit_status_snapshot
|
||||
@@ -27,6 +36,16 @@ app = FastAPI(
|
||||
version=VERSION
|
||||
)
|
||||
|
||||
# Add validation error handler to log details
|
||||
@app.exception_handler(RequestValidationError)
|
||||
async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
||||
logger.error(f"Validation error on {request.url}: {exc.errors()}")
|
||||
logger.error(f"Body: {await request.body()}")
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={"detail": exc.errors()}
|
||||
)
|
||||
|
||||
# Configure CORS
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
@@ -372,6 +391,127 @@ async def unknown_emitters_partial(request: Request):
|
||||
})
|
||||
|
||||
|
||||
@app.get("/partials/devices-all", response_class=HTMLResponse)
|
||||
async def devices_all_partial(request: Request):
|
||||
"""Unified partial template for ALL devices with comprehensive filtering support"""
|
||||
from datetime import datetime
|
||||
snapshot = emit_status_snapshot()
|
||||
|
||||
units_list = []
|
||||
|
||||
# Add deployed/active units
|
||||
for unit_id, unit_data in snapshot["active"].items():
|
||||
units_list.append({
|
||||
"id": unit_id,
|
||||
"status": unit_data.get("status", "Unknown"),
|
||||
"age": unit_data.get("age", "N/A"),
|
||||
"last_seen": unit_data.get("last", "Never"),
|
||||
"deployed": True,
|
||||
"retired": False,
|
||||
"ignored": False,
|
||||
"note": unit_data.get("note", ""),
|
||||
"device_type": unit_data.get("device_type", "seismograph"),
|
||||
"address": unit_data.get("address", ""),
|
||||
"coordinates": unit_data.get("coordinates", ""),
|
||||
"project_id": unit_data.get("project_id", ""),
|
||||
"last_calibrated": unit_data.get("last_calibrated"),
|
||||
"next_calibration_due": unit_data.get("next_calibration_due"),
|
||||
"deployed_with_modem_id": unit_data.get("deployed_with_modem_id"),
|
||||
"ip_address": unit_data.get("ip_address"),
|
||||
"phone_number": unit_data.get("phone_number"),
|
||||
"hardware_model": unit_data.get("hardware_model"),
|
||||
})
|
||||
|
||||
# Add benched units
|
||||
for unit_id, unit_data in snapshot["benched"].items():
|
||||
units_list.append({
|
||||
"id": unit_id,
|
||||
"status": unit_data.get("status", "N/A"),
|
||||
"age": unit_data.get("age", "N/A"),
|
||||
"last_seen": unit_data.get("last", "Never"),
|
||||
"deployed": False,
|
||||
"retired": False,
|
||||
"ignored": False,
|
||||
"note": unit_data.get("note", ""),
|
||||
"device_type": unit_data.get("device_type", "seismograph"),
|
||||
"address": unit_data.get("address", ""),
|
||||
"coordinates": unit_data.get("coordinates", ""),
|
||||
"project_id": unit_data.get("project_id", ""),
|
||||
"last_calibrated": unit_data.get("last_calibrated"),
|
||||
"next_calibration_due": unit_data.get("next_calibration_due"),
|
||||
"deployed_with_modem_id": unit_data.get("deployed_with_modem_id"),
|
||||
"ip_address": unit_data.get("ip_address"),
|
||||
"phone_number": unit_data.get("phone_number"),
|
||||
"hardware_model": unit_data.get("hardware_model"),
|
||||
})
|
||||
|
||||
# Add retired units
|
||||
for unit_id, unit_data in snapshot["retired"].items():
|
||||
units_list.append({
|
||||
"id": unit_id,
|
||||
"status": "Retired",
|
||||
"age": "N/A",
|
||||
"last_seen": "N/A",
|
||||
"deployed": False,
|
||||
"retired": True,
|
||||
"ignored": False,
|
||||
"note": unit_data.get("note", ""),
|
||||
"device_type": unit_data.get("device_type", "seismograph"),
|
||||
"address": unit_data.get("address", ""),
|
||||
"coordinates": unit_data.get("coordinates", ""),
|
||||
"project_id": unit_data.get("project_id", ""),
|
||||
"last_calibrated": unit_data.get("last_calibrated"),
|
||||
"next_calibration_due": unit_data.get("next_calibration_due"),
|
||||
"deployed_with_modem_id": unit_data.get("deployed_with_modem_id"),
|
||||
"ip_address": unit_data.get("ip_address"),
|
||||
"phone_number": unit_data.get("phone_number"),
|
||||
"hardware_model": unit_data.get("hardware_model"),
|
||||
})
|
||||
|
||||
# Add ignored units
|
||||
for unit_id, unit_data in snapshot.get("ignored", {}).items():
|
||||
units_list.append({
|
||||
"id": unit_id,
|
||||
"status": "Ignored",
|
||||
"age": "N/A",
|
||||
"last_seen": "N/A",
|
||||
"deployed": False,
|
||||
"retired": False,
|
||||
"ignored": True,
|
||||
"note": unit_data.get("note", unit_data.get("reason", "")),
|
||||
"device_type": unit_data.get("device_type", "unknown"),
|
||||
"address": "",
|
||||
"coordinates": "",
|
||||
"project_id": "",
|
||||
"last_calibrated": None,
|
||||
"next_calibration_due": None,
|
||||
"deployed_with_modem_id": None,
|
||||
"ip_address": None,
|
||||
"phone_number": None,
|
||||
"hardware_model": None,
|
||||
})
|
||||
|
||||
# Sort by status category, then by ID
|
||||
def sort_key(unit):
|
||||
# Priority: deployed (active) -> benched -> retired -> ignored
|
||||
if unit["deployed"]:
|
||||
return (0, unit["id"])
|
||||
elif not unit["retired"] and not unit["ignored"]:
|
||||
return (1, unit["id"])
|
||||
elif unit["retired"]:
|
||||
return (2, unit["id"])
|
||||
else:
|
||||
return (3, unit["id"])
|
||||
|
||||
units_list.sort(key=sort_key)
|
||||
|
||||
return templates.TemplateResponse("partials/devices_table.html", {
|
||||
"request": request,
|
||||
"units": units_list,
|
||||
"timestamp": datetime.now().strftime("%H:%M:%S")
|
||||
})
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health_check():
|
||||
"""Health check endpoint"""
|
||||
|
||||
@@ -1,13 +1,21 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, Form, UploadFile, File
|
||||
from fastapi import APIRouter, Depends, HTTPException, Form, UploadFile, File, Request
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
from datetime import datetime, date
|
||||
import csv
|
||||
import io
|
||||
import logging
|
||||
import httpx
|
||||
import os
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.models import RosterUnit, IgnoredUnit, Emitter, UnitHistory
|
||||
|
||||
router = APIRouter(prefix="/api/roster", tags=["roster-edit"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# SLMM backend URL for syncing device configs to cache
|
||||
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
|
||||
|
||||
def record_history(db: Session, unit_id: str, change_type: str, field_name: str = None,
|
||||
@@ -37,13 +45,98 @@ def get_or_create_roster_unit(db: Session, unit_id: str):
|
||||
return unit
|
||||
|
||||
|
||||
async def sync_slm_to_slmm_cache(
|
||||
unit_id: str,
|
||||
host: str = None,
|
||||
tcp_port: int = None,
|
||||
ftp_port: int = None,
|
||||
ftp_username: str = None,
|
||||
ftp_password: str = None,
|
||||
deployed_with_modem_id: str = None,
|
||||
db: Session = None
|
||||
) -> dict:
|
||||
"""
|
||||
Sync SLM device configuration to SLMM backend cache.
|
||||
|
||||
Terra-View is the source of truth for device configs. This function updates
|
||||
SLMM's config cache (NL43Config table) so SLMM can look up device connection
|
||||
info by unit_id without Terra-View passing host:port with every request.
|
||||
|
||||
Args:
|
||||
unit_id: Unique identifier for the SLM device
|
||||
host: Direct IP address/hostname OR will be resolved from modem
|
||||
tcp_port: TCP control port (default: 2255)
|
||||
ftp_port: FTP port (default: 21)
|
||||
ftp_username: FTP username (optional)
|
||||
ftp_password: FTP password (optional)
|
||||
deployed_with_modem_id: If set, resolve modem IP as host
|
||||
db: Database session for modem lookup
|
||||
|
||||
Returns:
|
||||
dict: {"success": bool, "message": str}
|
||||
"""
|
||||
# Resolve host from modem if assigned
|
||||
if deployed_with_modem_id and db:
|
||||
modem = db.query(RosterUnit).filter_by(
|
||||
id=deployed_with_modem_id,
|
||||
device_type="modem"
|
||||
).first()
|
||||
if modem and modem.ip_address:
|
||||
host = modem.ip_address
|
||||
logger.info(f"Resolved host from modem {deployed_with_modem_id}: {host}")
|
||||
|
||||
# Validate required fields
|
||||
if not host:
|
||||
logger.warning(f"Cannot sync SLM {unit_id} to SLMM: no host/IP address provided")
|
||||
return {"success": False, "message": "No host IP address available"}
|
||||
|
||||
# Set defaults
|
||||
tcp_port = tcp_port or 2255
|
||||
ftp_port = ftp_port or 21
|
||||
|
||||
# Build SLMM cache payload
|
||||
config_payload = {
|
||||
"host": host,
|
||||
"tcp_port": tcp_port,
|
||||
"tcp_enabled": True,
|
||||
"ftp_enabled": bool(ftp_username and ftp_password),
|
||||
"web_enabled": False
|
||||
}
|
||||
|
||||
if ftp_username and ftp_password:
|
||||
config_payload["ftp_username"] = ftp_username
|
||||
config_payload["ftp_password"] = ftp_password
|
||||
|
||||
# Call SLMM cache update API
|
||||
slmm_url = f"{SLMM_BASE_URL}/api/nl43/{unit_id}/config"
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.put(slmm_url, json=config_payload)
|
||||
|
||||
if response.status_code in [200, 201]:
|
||||
logger.info(f"Successfully synced SLM {unit_id} to SLMM cache")
|
||||
return {"success": True, "message": "Device config cached in SLMM"}
|
||||
else:
|
||||
logger.error(f"SLMM cache sync failed for {unit_id}: HTTP {response.status_code}")
|
||||
return {"success": False, "message": f"SLMM returned status {response.status_code}"}
|
||||
|
||||
except httpx.ConnectError:
|
||||
logger.error(f"Cannot connect to SLMM service at {SLMM_BASE_URL}")
|
||||
return {"success": False, "message": "SLMM service unavailable"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing SLM {unit_id} to SLMM: {e}")
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
|
||||
@router.post("/add")
|
||||
def add_roster_unit(
|
||||
async def add_roster_unit(
|
||||
id: str = Form(...),
|
||||
device_type: str = Form("seismograph"),
|
||||
unit_type: str = Form("series3"),
|
||||
deployed: bool = Form(False),
|
||||
retired: bool = Form(False),
|
||||
deployed: str = Form(None),
|
||||
retired: str = Form(None),
|
||||
note: str = Form(""),
|
||||
project_id: str = Form(None),
|
||||
location: str = Form(None),
|
||||
@@ -68,9 +161,11 @@ def add_roster_unit(
|
||||
slm_measurement_range: str = Form(None),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(f"Adding unit: id={id}, device_type={device_type}, slm_tcp_port={slm_tcp_port}, slm_ftp_port={slm_ftp_port}")
|
||||
logger.info(f"Adding unit: id={id}, device_type={device_type}, deployed={deployed}, retired={retired}")
|
||||
|
||||
# Convert boolean strings to actual booleans
|
||||
deployed_bool = deployed in ['true', 'True', '1', 'yes'] if deployed else False
|
||||
retired_bool = retired in ['true', 'True', '1', 'yes'] if retired else False
|
||||
|
||||
# Convert port strings to integers
|
||||
slm_tcp_port_int = int(slm_tcp_port) if slm_tcp_port and slm_tcp_port.strip() else None
|
||||
@@ -98,8 +193,8 @@ def add_roster_unit(
|
||||
id=id,
|
||||
device_type=device_type,
|
||||
unit_type=unit_type,
|
||||
deployed=deployed,
|
||||
retired=retired,
|
||||
deployed=deployed_bool,
|
||||
retired=retired_bool,
|
||||
note=note,
|
||||
project_id=project_id,
|
||||
location=location,
|
||||
@@ -126,6 +221,24 @@ def add_roster_unit(
|
||||
)
|
||||
db.add(unit)
|
||||
db.commit()
|
||||
|
||||
# If sound level meter, sync config to SLMM cache
|
||||
if device_type == "sound_level_meter":
|
||||
logger.info(f"Syncing SLM {id} config to SLMM cache...")
|
||||
result = await sync_slm_to_slmm_cache(
|
||||
unit_id=id,
|
||||
host=slm_host,
|
||||
tcp_port=slm_tcp_port_int,
|
||||
ftp_port=slm_ftp_port_int,
|
||||
deployed_with_modem_id=deployed_with_modem_id,
|
||||
db=db
|
||||
)
|
||||
|
||||
if not result["success"]:
|
||||
logger.warning(f"SLMM cache sync warning for {id}: {result['message']}")
|
||||
# Don't fail the operation - device is still added to Terra-View roster
|
||||
# User can manually sync later or SLMM will be synced on next config update
|
||||
|
||||
return {"message": "Unit added", "id": id, "device_type": device_type}
|
||||
|
||||
|
||||
@@ -186,8 +299,8 @@ def edit_roster_unit(
|
||||
unit_id: str,
|
||||
device_type: str = Form("seismograph"),
|
||||
unit_type: str = Form("series3"),
|
||||
deployed: bool = Form(False),
|
||||
retired: bool = Form(False),
|
||||
deployed: str = Form(None),
|
||||
retired: str = Form(None),
|
||||
note: str = Form(""),
|
||||
project_id: str = Form(None),
|
||||
location: str = Form(None),
|
||||
@@ -216,6 +329,10 @@ def edit_roster_unit(
|
||||
if not unit:
|
||||
raise HTTPException(status_code=404, detail="Unit not found")
|
||||
|
||||
# Convert boolean strings to actual booleans
|
||||
deployed_bool = deployed in ['true', 'True', '1', 'yes'] if deployed else False
|
||||
retired_bool = retired in ['true', 'True', '1', 'yes'] if retired else False
|
||||
|
||||
# Convert port strings to integers
|
||||
slm_tcp_port_int = int(slm_tcp_port) if slm_tcp_port and slm_tcp_port.strip() else None
|
||||
slm_ftp_port_int = int(slm_ftp_port) if slm_ftp_port and slm_ftp_port.strip() else None
|
||||
@@ -243,8 +360,8 @@ def edit_roster_unit(
|
||||
# Update all fields
|
||||
unit.device_type = device_type
|
||||
unit.unit_type = unit_type
|
||||
unit.deployed = deployed
|
||||
unit.retired = retired
|
||||
unit.deployed = deployed_bool
|
||||
unit.retired = retired_bool
|
||||
unit.note = note
|
||||
unit.project_id = project_id
|
||||
unit.location = location
|
||||
|
||||
@@ -12,15 +12,20 @@ from sqlalchemy import func
|
||||
from datetime import datetime, timedelta
|
||||
import httpx
|
||||
import logging
|
||||
import os
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.models import RosterUnit
|
||||
from backend.routers.roster_edit import sync_slm_to_slmm_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/slm-dashboard", tags=["slm-dashboard"])
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
# SLMM backend URL - configurable via environment variable
|
||||
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
|
||||
|
||||
@router.get("/stats", response_class=HTMLResponse)
|
||||
async def get_slm_stats(request: Request, db: Session = Depends(get_db)):
|
||||
@@ -120,7 +125,7 @@ async def get_live_view(request: Request, unit_id: str, db: Session = Depends(ge
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
# Get measurement state
|
||||
state_response = await client.get(
|
||||
f"http://localhost:8100/api/nl43/{unit_id}/measurement-state"
|
||||
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/measurement-state"
|
||||
)
|
||||
if state_response.status_code == 200:
|
||||
state_data = state_response.json()
|
||||
@@ -129,7 +134,7 @@ async def get_live_view(request: Request, unit_id: str, db: Session = Depends(ge
|
||||
|
||||
# Get live status
|
||||
status_response = await client.get(
|
||||
f"http://localhost:8100/api/nl43/{unit_id}/live"
|
||||
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/live"
|
||||
)
|
||||
if status_response.status_code == 200:
|
||||
status_data = status_response.json()
|
||||
@@ -162,7 +167,7 @@ async def control_slm(unit_id: str, action: str):
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.post(
|
||||
f"http://localhost:8100/api/nl43/{unit_id}/{action}"
|
||||
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/{action}"
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
@@ -239,6 +244,21 @@ async def save_slm_config(request: Request, unit_id: str, db: Session = Depends(
|
||||
db.commit()
|
||||
logger.info(f"Updated configuration for SLM {unit_id}")
|
||||
|
||||
# Sync updated configuration to SLMM cache
|
||||
logger.info(f"Syncing SLM {unit_id} config changes to SLMM cache...")
|
||||
result = await sync_slm_to_slmm_cache(
|
||||
unit_id=unit_id,
|
||||
host=unit.slm_host, # Use the updated host from Terra-View
|
||||
tcp_port=unit.slm_tcp_port,
|
||||
ftp_port=unit.slm_ftp_port,
|
||||
deployed_with_modem_id=unit.deployed_with_modem_id, # Resolve modem IP if assigned
|
||||
db=db
|
||||
)
|
||||
|
||||
if not result["success"]:
|
||||
logger.warning(f"SLMM cache sync warning for {unit_id}: {result['message']}")
|
||||
# Config still saved in Terra-View (source of truth)
|
||||
|
||||
return {"status": "success", "unit_id": unit_id}
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -5,9 +5,11 @@ Proxies requests from SFM to the standalone SLMM backend service.
|
||||
SLMM runs on port 8100 and handles NL43/NL53 sound level meter communication.
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request, Response
|
||||
from fastapi import APIRouter, HTTPException, Request, Response, WebSocket, WebSocketDisconnect
|
||||
from fastapi.responses import StreamingResponse
|
||||
import httpx
|
||||
import websockets
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
@@ -17,6 +19,8 @@ router = APIRouter(prefix="/api/slmm", tags=["slmm"])
|
||||
|
||||
# SLMM backend URL - configurable via environment variable
|
||||
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
# WebSocket URL derived from HTTP URL
|
||||
SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://")
|
||||
|
||||
|
||||
@router.get("/health")
|
||||
@@ -61,6 +65,173 @@ async def check_slmm_health():
|
||||
}
|
||||
|
||||
|
||||
# WebSocket routes MUST come before the catch-all route
|
||||
@router.websocket("/{unit_id}/stream")
|
||||
async def proxy_websocket_stream(websocket: WebSocket, unit_id: str):
|
||||
"""
|
||||
Proxy WebSocket connections to SLMM's /stream endpoint.
|
||||
|
||||
This allows real-time streaming of measurement data from NL43 devices
|
||||
through the SFM unified interface.
|
||||
"""
|
||||
await websocket.accept()
|
||||
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id}")
|
||||
|
||||
# Build target WebSocket URL
|
||||
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
|
||||
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
|
||||
|
||||
backend_ws = None
|
||||
|
||||
try:
|
||||
# Connect to SLMM backend WebSocket
|
||||
backend_ws = await websockets.connect(target_ws_url)
|
||||
logger.info(f"Connected to SLMM backend WebSocket for {unit_id}")
|
||||
|
||||
# Create tasks for bidirectional communication
|
||||
async def forward_to_backend():
|
||||
"""Forward messages from client to SLMM backend"""
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
await backend_ws.send(data)
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"Client WebSocket disconnected for {unit_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error forwarding to backend: {e}")
|
||||
|
||||
async def forward_to_client():
|
||||
"""Forward messages from SLMM backend to client"""
|
||||
try:
|
||||
async for message in backend_ws:
|
||||
await websocket.send_text(message)
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
logger.info(f"Backend WebSocket closed for {unit_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error forwarding to client: {e}")
|
||||
|
||||
# Run both forwarding tasks concurrently
|
||||
await asyncio.gather(
|
||||
forward_to_backend(),
|
||||
forward_to_client(),
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
except websockets.exceptions.WebSocketException as e:
|
||||
logger.error(f"WebSocket error connecting to SLMM backend: {e}")
|
||||
try:
|
||||
await websocket.send_json({
|
||||
"error": "Failed to connect to SLMM backend",
|
||||
"detail": str(e)
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in WebSocket proxy for {unit_id}: {e}")
|
||||
try:
|
||||
await websocket.send_json({
|
||||
"error": "Internal server error",
|
||||
"detail": str(e)
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# Clean up connections
|
||||
if backend_ws:
|
||||
try:
|
||||
await backend_ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await websocket.close()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"WebSocket proxy closed for {unit_id}")
|
||||
|
||||
|
||||
@router.websocket("/{unit_id}/live")
|
||||
async def proxy_websocket_live(websocket: WebSocket, unit_id: str):
|
||||
"""
|
||||
Proxy WebSocket connections to SLMM's /live endpoint.
|
||||
|
||||
Alternative WebSocket endpoint that may be used by some frontend components.
|
||||
"""
|
||||
await websocket.accept()
|
||||
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id} (live endpoint)")
|
||||
|
||||
# Build target WebSocket URL - try /stream endpoint as SLMM uses that for WebSocket
|
||||
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
|
||||
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
|
||||
|
||||
backend_ws = None
|
||||
|
||||
try:
|
||||
# Connect to SLMM backend WebSocket
|
||||
backend_ws = await websockets.connect(target_ws_url)
|
||||
logger.info(f"Connected to SLMM backend WebSocket for {unit_id} (live endpoint)")
|
||||
|
||||
# Create tasks for bidirectional communication
|
||||
async def forward_to_backend():
|
||||
"""Forward messages from client to SLMM backend"""
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
await backend_ws.send(data)
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"Client WebSocket disconnected for {unit_id} (live)")
|
||||
except Exception as e:
|
||||
logger.error(f"Error forwarding to backend (live): {e}")
|
||||
|
||||
async def forward_to_client():
|
||||
"""Forward messages from SLMM backend to client"""
|
||||
try:
|
||||
async for message in backend_ws:
|
||||
await websocket.send_text(message)
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
logger.info(f"Backend WebSocket closed for {unit_id} (live)")
|
||||
except Exception as e:
|
||||
logger.error(f"Error forwarding to client (live): {e}")
|
||||
|
||||
# Run both forwarding tasks concurrently
|
||||
await asyncio.gather(
|
||||
forward_to_backend(),
|
||||
forward_to_client(),
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
except websockets.exceptions.WebSocketException as e:
|
||||
logger.error(f"WebSocket error connecting to SLMM backend (live): {e}")
|
||||
try:
|
||||
await websocket.send_json({
|
||||
"error": "Failed to connect to SLMM backend",
|
||||
"detail": str(e)
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in WebSocket proxy for {unit_id} (live): {e}")
|
||||
try:
|
||||
await websocket.send_json({
|
||||
"error": "Internal server error",
|
||||
"detail": str(e)
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# Clean up connections
|
||||
if backend_ws:
|
||||
try:
|
||||
await backend_ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await websocket.close()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
|
||||
|
||||
|
||||
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
|
||||
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
||||
async def proxy_to_slmm(path: str, request: Request):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user