Migration Part2, now unified.

This commit is contained in:
serversdwn
2026-01-09 07:56:12 +00:00
parent 16eb9eb1fe
commit ff438c1197
15 changed files with 2392 additions and 785 deletions

View File

@@ -0,0 +1 @@
# SLMM addon package for NL43 integration.

View File

@@ -1,29 +1,20 @@
"""
Sound Level Meter feature module database connection
"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
# Ensure data directory exists
# Ensure data directory exists for the SLMM addon
os.makedirs("data", exist_ok=True)
# For now, we'll use the shared database (seismo_fleet.db) until we migrate
# TODO: Migrate to slm.db
SQLALCHEMY_DATABASE_URL = "sqlite:///./data/seismo_fleet.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SQLALCHEMY_DATABASE_URL = "sqlite:///./data/slmm.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Dependency for database sessions"""
"""Dependency for database sessions."""
db = SessionLocal()
try:
yield db
@@ -32,5 +23,5 @@ def get_db():
def get_db_session():
"""Get a database session directly (not as a dependency)"""
"""Get a database session directly (not as a dependency)."""
return SessionLocal()

116
app/slm/main.py Normal file
View File

@@ -0,0 +1,116 @@
import os
import logging
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from app.slm.database import Base, engine
from app.slm import routers
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("data/slmm.log"),
],
)
logger = logging.getLogger(__name__)
# Ensure database tables exist for the addon
Base.metadata.create_all(bind=engine)
logger.info("Database tables initialized")
app = FastAPI(
title="SLMM NL43 Addon",
description="Standalone module for NL43 configuration and status APIs",
version="0.1.0",
)
# CORS configuration - use environment variable for allowed origins
# Default to "*" for development, but should be restricted in production
allowed_origins = os.getenv("CORS_ORIGINS", "*").split(",")
logger.info(f"CORS allowed origins: {allowed_origins}")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
templates = Jinja2Templates(directory="templates")
app.include_router(routers.router)
@app.get("/", response_class=HTMLResponse)
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/health")
async def health():
"""Basic health check endpoint."""
return {"status": "ok", "service": "slmm-nl43-addon"}
@app.get("/health/devices")
async def health_devices():
"""Enhanced health check that tests device connectivity."""
from sqlalchemy.orm import Session
from app.slm.database import SessionLocal
from app.slm.services import NL43Client
from app.slm.models import NL43Config
db: Session = SessionLocal()
device_status = []
try:
configs = db.query(NL43Config).filter_by(tcp_enabled=True).all()
for cfg in configs:
client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
status = {
"unit_id": cfg.unit_id,
"host": cfg.host,
"port": cfg.tcp_port,
"reachable": False,
"error": None,
}
try:
# Try to connect (don't send command to avoid rate limiting issues)
import asyncio
reader, writer = await asyncio.wait_for(
asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=2.0
)
writer.close()
await writer.wait_closed()
status["reachable"] = True
except Exception as e:
status["error"] = str(type(e).__name__)
logger.warning(f"Device {cfg.unit_id} health check failed: {e}")
device_status.append(status)
finally:
db.close()
all_reachable = all(d["reachable"] for d in device_status) if device_status else True
return {
"status": "ok" if all_reachable else "degraded",
"devices": device_status,
"total_devices": len(device_status),
"reachable_devices": sum(1 for d in device_status if d["reachable"]),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=int(os.getenv("PORT", "8100")), reload=True)

View File

@@ -1,10 +1,43 @@
"""
Sound Level Meter feature module models
"""
from sqlalchemy import Column, String, Integer, Boolean, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, func
from app.slm.database import Base
Base = declarative_base()
# TODO: When we split databases, SLM-specific models will go here
# For now, SLM data is in the shared seismo_fleet.db database
class NL43Config(Base):
"""
NL43 connection/config metadata for the standalone SLMM addon.
"""
__tablename__ = "nl43_config"
unit_id = Column(String, primary_key=True, index=True)
host = Column(String, default="127.0.0.1")
tcp_port = Column(Integer, default=80) # NL43 TCP control port (via RX55)
tcp_enabled = Column(Boolean, default=True)
ftp_enabled = Column(Boolean, default=False)
ftp_username = Column(String, nullable=True) # FTP login username
ftp_password = Column(String, nullable=True) # FTP login password
web_enabled = Column(Boolean, default=False)
class NL43Status(Base):
"""
Latest NL43 status snapshot for quick dashboard/API access.
"""
__tablename__ = "nl43_status"
unit_id = Column(String, primary_key=True, index=True)
last_seen = Column(DateTime, default=func.now())
measurement_state = Column(String, default="unknown") # Measure/Stop
measurement_start_time = Column(DateTime, nullable=True) # When measurement started (UTC)
counter = Column(String, nullable=True) # d0: Measurement interval counter (1-600)
lp = Column(String, nullable=True) # Instantaneous sound pressure level
leq = Column(String, nullable=True) # Equivalent continuous sound level
lmax = Column(String, nullable=True) # Maximum level
lmin = Column(String, nullable=True) # Minimum level
lpeak = Column(String, nullable=True) # Peak level
battery_level = Column(String, nullable=True)
power_source = Column(String, nullable=True)
sd_remaining_mb = Column(String, nullable=True)
sd_free_ratio = Column(String, nullable=True)
raw_payload = Column(Text, nullable=True)

1333
app/slm/routers.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,328 +0,0 @@
"""
SLM Dashboard Router
Provides API endpoints for the Sound Level Meters dashboard page.
"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import datetime, timedelta
import httpx
import logging
import os
from app.seismo.database import get_db
from app.seismo.models import RosterUnit
from app.seismo.routers.roster_edit import sync_slm_to_slmm_cache
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/slm-dashboard", tags=["slm-dashboard"])
templates = Jinja2Templates(directory="templates")
# SLMM backend URL - configurable via environment variable
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
@router.get("/stats", response_class=HTMLResponse)
async def get_slm_stats(request: Request, db: Session = Depends(get_db)):
"""
Get summary statistics for SLM dashboard.
Returns HTML partial with stat cards.
"""
# Query all SLMs
all_slms = db.query(RosterUnit).filter_by(device_type="sound_level_meter").all()
# Count deployed vs benched
deployed_count = sum(1 for slm in all_slms if slm.deployed and not slm.retired)
benched_count = sum(1 for slm in all_slms if not slm.deployed and not slm.retired)
retired_count = sum(1 for slm in all_slms if slm.retired)
# Count recently active (checked in last hour)
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
active_count = sum(1 for slm in all_slms
if slm.slm_last_check and slm.slm_last_check > one_hour_ago)
return templates.TemplateResponse("partials/slm_stats.html", {
"request": request,
"total_count": len(all_slms),
"deployed_count": deployed_count,
"benched_count": benched_count,
"active_count": active_count,
"retired_count": retired_count
})
@router.get("/units", response_class=HTMLResponse)
async def get_slm_units(
request: Request,
db: Session = Depends(get_db),
search: str = Query(None)
):
"""
Get list of SLM units for the sidebar.
Returns HTML partial with unit cards.
"""
query = db.query(RosterUnit).filter_by(device_type="sound_level_meter")
# Filter by search term if provided
if search:
search_term = f"%{search}%"
query = query.filter(
(RosterUnit.id.like(search_term)) |
(RosterUnit.slm_model.like(search_term)) |
(RosterUnit.address.like(search_term))
)
# Only show deployed units by default
units = query.filter_by(deployed=True, retired=False).order_by(RosterUnit.id).all()
return templates.TemplateResponse("partials/slm_unit_list.html", {
"request": request,
"units": units
})
@router.get("/live-view/{unit_id}", response_class=HTMLResponse)
async def get_live_view(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""
Get live view panel for a specific SLM unit.
Returns HTML partial with live metrics and chart.
"""
# Get unit from database
unit = db.query(RosterUnit).filter_by(id=unit_id, device_type="sound_level_meter").first()
if not unit:
return templates.TemplateResponse("partials/slm_live_view_error.html", {
"request": request,
"error": f"Unit {unit_id} not found"
})
# Get modem information if assigned
modem = None
modem_ip = None
if unit.deployed_with_modem_id:
modem = db.query(RosterUnit).filter_by(id=unit.deployed_with_modem_id, device_type="modem").first()
if modem:
modem_ip = modem.ip_address
else:
logger.warning(f"SLM {unit_id} is assigned to modem {unit.deployed_with_modem_id} but modem not found")
# Fallback to direct slm_host if no modem assigned (backward compatibility)
if not modem_ip and unit.slm_host:
modem_ip = unit.slm_host
logger.info(f"Using legacy slm_host for {unit_id}: {modem_ip}")
# Try to get current status from SLMM
current_status = None
measurement_state = None
is_measuring = False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
# Get measurement state
state_response = await client.get(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/measurement-state"
)
if state_response.status_code == 200:
state_data = state_response.json()
measurement_state = state_data.get("measurement_state", "Unknown")
is_measuring = state_data.get("is_measuring", False)
# Get live status
status_response = await client.get(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/live"
)
if status_response.status_code == 200:
status_data = status_response.json()
current_status = status_data.get("data", {})
except Exception as e:
logger.error(f"Failed to get status for {unit_id}: {e}")
return templates.TemplateResponse("partials/slm_live_view.html", {
"request": request,
"unit": unit,
"modem": modem,
"modem_ip": modem_ip,
"current_status": current_status,
"measurement_state": measurement_state,
"is_measuring": is_measuring
})
@router.post("/control/{unit_id}/{action}")
async def control_slm(unit_id: str, action: str):
"""
Send control commands to SLM (start, stop, pause, resume, reset).
Proxies to SLMM backend.
"""
valid_actions = ["start", "stop", "pause", "resume", "reset"]
if action not in valid_actions:
return {"status": "error", "detail": f"Invalid action. Must be one of: {valid_actions}"}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/{action}"
)
if response.status_code == 200:
return response.json()
else:
return {
"status": "error",
"detail": f"SLMM returned status {response.status_code}"
}
except Exception as e:
logger.error(f"Failed to control {unit_id}: {e}")
return {
"status": "error",
"detail": str(e)
}
@router.get("/config/{unit_id}", response_class=HTMLResponse)
async def get_slm_config(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""
Get configuration form for a specific SLM unit.
Returns HTML partial with configuration form.
"""
unit = db.query(RosterUnit).filter_by(id=unit_id, device_type="sound_level_meter").first()
if not unit:
return HTMLResponse(
content=f'<div class="text-red-500">Unit {unit_id} not found</div>',
status_code=404
)
return templates.TemplateResponse("partials/slm_config_form.html", {
"request": request,
"unit": unit
})
@router.post("/config/{unit_id}")
async def save_slm_config(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""
Save SLM configuration.
Updates unit parameters in the database.
"""
unit = db.query(RosterUnit).filter_by(id=unit_id, device_type="sound_level_meter").first()
if not unit:
return {"status": "error", "detail": f"Unit {unit_id} not found"}
try:
# Get form data
form_data = await request.form()
# Update SLM-specific fields
unit.slm_model = form_data.get("slm_model") or None
unit.slm_serial_number = form_data.get("slm_serial_number") or None
unit.slm_frequency_weighting = form_data.get("slm_frequency_weighting") or None
unit.slm_time_weighting = form_data.get("slm_time_weighting") or None
unit.slm_measurement_range = form_data.get("slm_measurement_range") or None
# Update network configuration
modem_id = form_data.get("deployed_with_modem_id")
unit.deployed_with_modem_id = modem_id if modem_id else None
# Always update TCP and FTP ports (used regardless of modem assignment)
unit.slm_tcp_port = int(form_data.get("slm_tcp_port")) if form_data.get("slm_tcp_port") else None
unit.slm_ftp_port = int(form_data.get("slm_ftp_port")) if form_data.get("slm_ftp_port") else None
# Only update direct IP if no modem is assigned
if not modem_id:
unit.slm_host = form_data.get("slm_host") or None
else:
# Clear legacy direct IP field when modem is assigned
unit.slm_host = None
db.commit()
logger.info(f"Updated configuration for SLM {unit_id}")
# Sync updated configuration to SLMM cache
logger.info(f"Syncing SLM {unit_id} config changes to SLMM cache...")
result = await sync_slm_to_slmm_cache(
unit_id=unit_id,
host=unit.slm_host, # Use the updated host from Terra-View
tcp_port=unit.slm_tcp_port,
ftp_port=unit.slm_ftp_port,
deployed_with_modem_id=unit.deployed_with_modem_id, # Resolve modem IP if assigned
db=db
)
if not result["success"]:
logger.warning(f"SLMM cache sync warning for {unit_id}: {result['message']}")
# Config still saved in Terra-View (source of truth)
return {"status": "success", "unit_id": unit_id}
except Exception as e:
db.rollback()
logger.error(f"Failed to save config for {unit_id}: {e}")
return {"status": "error", "detail": str(e)}
@router.get("/test-modem/{modem_id}")
async def test_modem_connection(modem_id: str, db: Session = Depends(get_db)):
"""
Test modem connectivity with a simple ping/health check.
Returns response time and connection status.
"""
import subprocess
import time
# Get modem from database
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
return {"status": "error", "detail": f"Modem {modem_id} not found"}
if not modem.ip_address:
return {"status": "error", "detail": f"Modem {modem_id} has no IP address configured"}
try:
# Ping the modem (1 packet, 2 second timeout)
start_time = time.time()
result = subprocess.run(
["ping", "-c", "1", "-W", "2", modem.ip_address],
capture_output=True,
text=True,
timeout=3
)
response_time = int((time.time() - start_time) * 1000) # Convert to milliseconds
if result.returncode == 0:
return {
"status": "success",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"response_time": response_time,
"message": "Modem is responding to ping"
}
else:
return {
"status": "error",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"detail": "Modem not responding to ping"
}
except subprocess.TimeoutExpired:
return {
"status": "error",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"detail": "Ping timeout (> 2 seconds)"
}
except Exception as e:
logger.error(f"Failed to ping modem {modem_id}: {e}")
return {
"status": "error",
"modem_id": modem_id,
"detail": str(e)
}

View File

@@ -1,301 +0,0 @@
"""
SLMM (Sound Level Meter Manager) Proxy Router
Proxies requests from SFM to the standalone SLMM backend service.
SLMM runs on port 8100 and handles NL43/NL53 sound level meter communication.
"""
from fastapi import APIRouter, HTTPException, Request, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
import httpx
import websockets
import asyncio
import logging
import os
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/slmm", tags=["slmm"])
# SLMM backend URL - configurable via environment variable
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
# WebSocket URL derived from HTTP URL
SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://")
@router.get("/health")
async def check_slmm_health():
"""
Check if the SLMM backend service is reachable and healthy.
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{SLMM_BASE_URL}/health")
if response.status_code == 200:
data = response.json()
return {
"status": "ok",
"slmm_status": "connected",
"slmm_url": SLMM_BASE_URL,
"slmm_version": data.get("version", "unknown"),
"slmm_response": data
}
else:
return {
"status": "degraded",
"slmm_status": "error",
"slmm_url": SLMM_BASE_URL,
"detail": f"SLMM returned status {response.status_code}"
}
except httpx.ConnectError:
return {
"status": "error",
"slmm_status": "unreachable",
"slmm_url": SLMM_BASE_URL,
"detail": "Cannot connect to SLMM backend. Is it running?"
}
except Exception as e:
return {
"status": "error",
"slmm_status": "error",
"slmm_url": SLMM_BASE_URL,
"detail": str(e)
}
# WebSocket routes MUST come before the catch-all route
@router.websocket("/{unit_id}/stream")
async def proxy_websocket_stream(websocket: WebSocket, unit_id: str):
"""
Proxy WebSocket connections to SLMM's /stream endpoint.
This allows real-time streaming of measurement data from NL43 devices
through the SFM unified interface.
"""
await websocket.accept()
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id}")
# Build target WebSocket URL
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
backend_ws = None
try:
# Connect to SLMM backend WebSocket
backend_ws = await websockets.connect(target_ws_url)
logger.info(f"Connected to SLMM backend WebSocket for {unit_id}")
# Create tasks for bidirectional communication
async def forward_to_backend():
"""Forward messages from client to SLMM backend"""
try:
while True:
data = await websocket.receive_text()
await backend_ws.send(data)
except WebSocketDisconnect:
logger.info(f"Client WebSocket disconnected for {unit_id}")
except Exception as e:
logger.error(f"Error forwarding to backend: {e}")
async def forward_to_client():
"""Forward messages from SLMM backend to client"""
try:
async for message in backend_ws:
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Backend WebSocket closed for {unit_id}")
except Exception as e:
logger.error(f"Error forwarding to client: {e}")
# Run both forwarding tasks concurrently
await asyncio.gather(
forward_to_backend(),
forward_to_client(),
return_exceptions=True
)
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error connecting to SLMM backend: {e}")
try:
await websocket.send_json({
"error": "Failed to connect to SLMM backend",
"detail": str(e)
})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in WebSocket proxy for {unit_id}: {e}")
try:
await websocket.send_json({
"error": "Internal server error",
"detail": str(e)
})
except Exception:
pass
finally:
# Clean up connections
if backend_ws:
try:
await backend_ws.close()
except Exception:
pass
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket proxy closed for {unit_id}")
@router.websocket("/{unit_id}/live")
async def proxy_websocket_live(websocket: WebSocket, unit_id: str):
"""
Proxy WebSocket connections to SLMM's /live endpoint.
Alternative WebSocket endpoint that may be used by some frontend components.
"""
await websocket.accept()
logger.info(f"WebSocket connection accepted for SLMM unit {unit_id} (live endpoint)")
# Build target WebSocket URL - try /stream endpoint as SLMM uses that for WebSocket
target_ws_url = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/stream"
logger.info(f"Connecting to SLMM WebSocket: {target_ws_url}")
backend_ws = None
try:
# Connect to SLMM backend WebSocket
backend_ws = await websockets.connect(target_ws_url)
logger.info(f"Connected to SLMM backend WebSocket for {unit_id} (live endpoint)")
# Create tasks for bidirectional communication
async def forward_to_backend():
"""Forward messages from client to SLMM backend"""
try:
while True:
data = await websocket.receive_text()
await backend_ws.send(data)
except WebSocketDisconnect:
logger.info(f"Client WebSocket disconnected for {unit_id} (live)")
except Exception as e:
logger.error(f"Error forwarding to backend (live): {e}")
async def forward_to_client():
"""Forward messages from SLMM backend to client"""
try:
async for message in backend_ws:
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Backend WebSocket closed for {unit_id} (live)")
except Exception as e:
logger.error(f"Error forwarding to client (live): {e}")
# Run both forwarding tasks concurrently
await asyncio.gather(
forward_to_backend(),
forward_to_client(),
return_exceptions=True
)
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error connecting to SLMM backend (live): {e}")
try:
await websocket.send_json({
"error": "Failed to connect to SLMM backend",
"detail": str(e)
})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in WebSocket proxy for {unit_id} (live): {e}")
try:
await websocket.send_json({
"error": "Internal server error",
"detail": str(e)
})
except Exception:
pass
finally:
# Clean up connections
if backend_ws:
try:
await backend_ws.close()
except Exception:
pass
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket proxy closed for {unit_id} (live)")
# HTTP catch-all route MUST come after specific routes (including WebSocket routes)
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_slmm(path: str, request: Request):
"""
Proxy all requests to the SLMM backend service.
This allows SFM to act as a unified frontend for all device types,
while SLMM remains a standalone backend service.
"""
# Build target URL
target_url = f"{SLMM_BASE_URL}/api/nl43/{path}"
# Get query parameters
query_params = dict(request.query_params)
# Get request body if present
body = None
if request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.body()
except Exception as e:
logger.error(f"Failed to read request body: {e}")
body = None
# Get headers (exclude host and other proxy-specific headers)
headers = dict(request.headers)
headers_to_exclude = ["host", "content-length", "transfer-encoding", "connection"]
proxy_headers = {k: v for k, v in headers.items() if k.lower() not in headers_to_exclude}
logger.info(f"Proxying {request.method} request to SLMM: {target_url}")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# Forward the request to SLMM
response = await client.request(
method=request.method,
url=target_url,
params=query_params,
headers=proxy_headers,
content=body
)
# Return the response from SLMM
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.headers.get("content-type")
)
except httpx.ConnectError:
logger.error(f"Failed to connect to SLMM backend at {SLMM_BASE_URL}")
raise HTTPException(
status_code=503,
detail=f"SLMM backend service unavailable. Is SLMM running on {SLMM_BASE_URL}?"
)
except httpx.TimeoutException:
logger.error(f"Timeout connecting to SLMM backend at {SLMM_BASE_URL}")
raise HTTPException(
status_code=504,
detail="SLMM backend timeout"
)
except Exception as e:
logger.error(f"Error proxying to SLMM: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to proxy request to SLMM: {str(e)}"
)

View File

@@ -1,123 +0,0 @@
"""
Sound Level Meter UI Router
Provides endpoints for SLM dashboard cards, detail pages, and real-time data.
"""
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from datetime import datetime
import httpx
import logging
import os
from app.seismo.database import get_db
from app.seismo.models import RosterUnit
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/slm", tags=["slm-ui"])
templates = Jinja2Templates(directory="templates")
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://172.19.0.1:8100")
@router.get("/{unit_id}", response_class=HTMLResponse)
async def slm_detail_page(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""Sound level meter detail page with controls."""
# Get roster unit
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit or unit.device_type != "sound_level_meter":
raise HTTPException(status_code=404, detail="Sound level meter not found")
return templates.TemplateResponse("slm_detail.html", {
"request": request,
"unit": unit,
"unit_id": unit_id
})
@router.get("/api/{unit_id}/summary")
async def get_slm_summary(unit_id: str, db: Session = Depends(get_db)):
"""Get SLM summary data for dashboard card."""
# Get roster unit
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit or unit.device_type != "sound_level_meter":
raise HTTPException(status_code=404, detail="Sound level meter not found")
# Try to get live status from SLMM
status_data = None
try:
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/status")
if response.status_code == 200:
status_data = response.json().get("data")
except Exception as e:
logger.warning(f"Failed to get SLM status for {unit_id}: {e}")
return {
"unit_id": unit_id,
"device_type": "sound_level_meter",
"deployed": unit.deployed,
"model": unit.slm_model or "NL-43",
"location": unit.address or unit.location,
"coordinates": unit.coordinates,
"note": unit.note,
"status": status_data,
"last_check": unit.slm_last_check.isoformat() if unit.slm_last_check else None,
}
@router.get("/partials/{unit_id}/card", response_class=HTMLResponse)
async def slm_dashboard_card(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""Render SLM dashboard card partial."""
summary = await get_slm_summary(unit_id, db)
return templates.TemplateResponse("partials/slm_card.html", {
"request": request,
"slm": summary
})
@router.get("/partials/{unit_id}/controls", response_class=HTMLResponse)
async def slm_controls_partial(request: Request, unit_id: str, db: Session = Depends(get_db)):
"""Render SLM control panel partial."""
unit = db.query(RosterUnit).filter_by(id=unit_id).first()
if not unit or unit.device_type != "sound_level_meter":
raise HTTPException(status_code=404, detail="Sound level meter not found")
# Get current status from SLMM
measurement_state = None
battery_level = None
try:
async with httpx.AsyncClient(timeout=3.0) as client:
# Get measurement state
state_response = await client.get(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/measurement-state"
)
if state_response.status_code == 200:
measurement_state = state_response.json().get("measurement_state")
# Get battery level
battery_response = await client.get(
f"{SLMM_BASE_URL}/api/nl43/{unit_id}/battery"
)
if battery_response.status_code == 200:
battery_level = battery_response.json().get("battery_level")
except Exception as e:
logger.warning(f"Failed to get SLM control data for {unit_id}: {e}")
return templates.TemplateResponse("partials/slm_controls.html", {
"request": request,
"unit_id": unit_id,
"unit": unit,
"measurement_state": measurement_state,
"battery_level": battery_level,
"is_measuring": measurement_state == "Start"
})

828
app/slm/services.py Normal file
View File

@@ -0,0 +1,828 @@
"""
NL43 TCP connector and snapshot persistence.
Implements simple per-request TCP calls to avoid long-lived socket complexity.
Extend to pooled connections/DRD streaming later.
"""
import asyncio
import contextlib
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
from sqlalchemy.orm import Session
from ftplib import FTP
from pathlib import Path
from app.slm.models import NL43Status
logger = logging.getLogger(__name__)
@dataclass
class NL43Snapshot:
unit_id: str
measurement_state: str = "unknown"
counter: Optional[str] = None # d0: Measurement interval counter (1-600)
lp: Optional[str] = None # Instantaneous sound pressure level
leq: Optional[str] = None # Equivalent continuous sound level
lmax: Optional[str] = None # Maximum level
lmin: Optional[str] = None # Minimum level
lpeak: Optional[str] = None # Peak level
battery_level: Optional[str] = None
power_source: Optional[str] = None
sd_remaining_mb: Optional[str] = None
sd_free_ratio: Optional[str] = None
raw_payload: Optional[str] = None
def persist_snapshot(s: NL43Snapshot, db: Session):
"""Persist the latest snapshot for API/dashboard use."""
try:
row = db.query(NL43Status).filter_by(unit_id=s.unit_id).first()
if not row:
row = NL43Status(unit_id=s.unit_id)
db.add(row)
row.last_seen = datetime.utcnow()
# Track measurement start time by detecting state transition
previous_state = row.measurement_state
new_state = s.measurement_state
logger.info(f"State transition check for {s.unit_id}: '{previous_state}' -> '{new_state}'")
# Device returns "Start" when measuring, "Stop" when stopped
# Normalize to previous behavior for backward compatibility
is_measuring = new_state == "Start"
was_measuring = previous_state == "Start"
if not was_measuring and is_measuring:
# Measurement just started - record the start time
row.measurement_start_time = datetime.utcnow()
logger.info(f"✓ Measurement started on {s.unit_id} at {row.measurement_start_time}")
elif was_measuring and not is_measuring:
# Measurement stopped - clear the start time
row.measurement_start_time = None
logger.info(f"✓ Measurement stopped on {s.unit_id}")
row.measurement_state = new_state
row.counter = s.counter
row.lp = s.lp
row.leq = s.leq
row.lmax = s.lmax
row.lmin = s.lmin
row.lpeak = s.lpeak
row.battery_level = s.battery_level
row.power_source = s.power_source
row.sd_remaining_mb = s.sd_remaining_mb
row.sd_free_ratio = s.sd_free_ratio
row.raw_payload = s.raw_payload
db.commit()
except Exception as e:
db.rollback()
logger.error(f"Failed to persist snapshot for unit {s.unit_id}: {e}")
raise
# Rate limiting: NL43 requires ≥1 second between commands
_last_command_time = {}
_rate_limit_lock = asyncio.Lock()
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None):
self.host = host
self.port = port
self.timeout = timeout
self.ftp_username = ftp_username or "anonymous"
self.ftp_password = ftp_password or ""
self.device_key = f"{host}:{port}"
async def _enforce_rate_limit(self):
"""Ensure ≥1 second between commands to the same device."""
async with _rate_limit_lock:
last_time = _last_command_time.get(self.device_key, 0)
elapsed = time.time() - last_time
if elapsed < 1.0:
wait_time = 1.0 - elapsed
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {self.device_key}")
await asyncio.sleep(wait_time)
_last_command_time[self.device_key] = time.time()
async def _send_command(self, cmd: str) -> str:
"""Send ASCII command to NL43 device via TCP.
NL43 protocol returns two lines for query commands:
Line 1: Result code (R+0000 for success, error codes otherwise)
Line 2: Actual data (for query commands ending with '?')
"""
await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"Connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
writer.write(cmd.encode("ascii"))
await writer.drain()
# Read first line (result code)
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
# Remove leading $ prompt if present
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.info(f"Result code from {self.device_key}: {result_code}")
# Check result code
if result_code == "R+0000":
# Success - for query commands, read the second line with actual data
is_query = cmd.strip().endswith("?")
if is_query:
data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
response = data_line.decode(errors="ignore").strip()
logger.debug(f"Data line from {self.device_key}: {response}")
return response
else:
# Setting command - return success code
return result_code
elif result_code == "R+0001":
raise ValueError("Command error - device did not recognize command")
elif result_code == "R+0002":
raise ValueError("Parameter error - invalid parameter value")
elif result_code == "R+0003":
raise ValueError("Spec/type error - command not supported by this device model")
elif result_code == "R+0004":
raise ValueError("Status error - device is in wrong state for this command")
else:
raise ValueError(f"Unknown result code: {result_code}")
except asyncio.TimeoutError:
logger.error(f"Response timeout from {self.device_key}")
raise TimeoutError(f"Device did not respond within {self.timeout}s")
except Exception as e:
logger.error(f"Communication error with {self.device_key}: {e}")
raise
finally:
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
async def request_dod(self) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device.
Returns parsed measurement data from the device display.
"""
# _send_command now handles result code validation and returns the data line
resp = await self._send_command("DOD?\r\n")
# Validate response format
if not resp:
logger.warning(f"Empty data response from DOD command on {self.device_key}")
raise ValueError("Device returned empty data for DOD? command")
# Remove leading $ prompt if present (shouldn't be there after _send_command, but be safe)
if resp.startswith("$"):
resp = resp[1:].strip()
parts = [p.strip() for p in resp.split(",") if p.strip() != ""]
# DOD should return at least some data points
if len(parts) < 2:
logger.error(f"Malformed DOD data from {self.device_key}: {resp}")
raise ValueError(f"Malformed DOD data: expected comma-separated values, got: {resp}")
logger.info(f"Parsed {len(parts)} data points from DOD response")
# Query actual measurement state (DOD doesn't include this information)
try:
measurement_state = await self.get_measurement_state()
except Exception as e:
logger.warning(f"Failed to get measurement state, defaulting to 'Measure': {e}")
measurement_state = "Measure"
snap = NL43Snapshot(unit_id="", raw_payload=resp, measurement_state=measurement_state)
# Parse known positions (based on NL43 communication guide - DRD format)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DOD data points: {e}")
return snap
async def start(self):
"""Start measurement on the device.
According to NL43 protocol: Measure,Start (no $ prefix, capitalized param)
"""
await self._send_command("Measure,Start\r\n")
async def stop(self):
"""Stop measurement on the device.
According to NL43 protocol: Measure,Stop (no $ prefix, capitalized param)
"""
await self._send_command("Measure,Stop\r\n")
async def set_store_mode_manual(self):
"""Set the device to Manual Store mode.
According to NL43 protocol: Store Mode,Manual sets manual storage mode
"""
await self._send_command("Store Mode,Manual\r\n")
logger.info(f"Store mode set to Manual on {self.device_key}")
async def manual_store(self):
"""Manually store the current measurement data.
According to NL43 protocol: Manual Store,Start executes storing
Parameter p1="Start" executes the storage operation
Device must be in Manual Store mode first
"""
await self._send_command("Manual Store,Start\r\n")
logger.info(f"Manual store executed on {self.device_key}")
async def pause(self):
"""Pause the current measurement."""
await self._send_command("Pause,On\r\n")
logger.info(f"Measurement paused on {self.device_key}")
async def resume(self):
"""Resume a paused measurement."""
await self._send_command("Pause,Off\r\n")
logger.info(f"Measurement resumed on {self.device_key}")
async def reset(self):
"""Reset the measurement data."""
await self._send_command("Reset\r\n")
logger.info(f"Measurement data reset on {self.device_key}")
async def get_measurement_state(self) -> str:
"""Get the current measurement state.
Returns: "Start" if measuring, "Stop" if stopped
"""
resp = await self._send_command("Measure?\r\n")
state = resp.strip()
logger.info(f"Measurement state on {self.device_key}: {state}")
return state
async def get_battery_level(self) -> str:
"""Get the battery level."""
resp = await self._send_command("Battery Level?\r\n")
logger.info(f"Battery level on {self.device_key}: {resp}")
return resp.strip()
async def get_clock(self) -> str:
"""Get the device clock time."""
resp = await self._send_command("Clock?\r\n")
logger.info(f"Clock on {self.device_key}: {resp}")
return resp.strip()
async def set_clock(self, datetime_str: str):
"""Set the device clock time.
Args:
datetime_str: Time in format YYYY/MM/DD,HH:MM:SS or YYYY/MM/DD HH:MM:SS
"""
# Device expects format: Clock,YYYY/MM/DD HH:MM:SS (space between date and time)
# Replace comma with space if present to normalize format
normalized = datetime_str.replace(',', ' ', 1)
await self._send_command(f"Clock,{normalized}\r\n")
logger.info(f"Clock set on {self.device_key} to {normalized}")
async def get_frequency_weighting(self, channel: str = "Main") -> str:
"""Get frequency weighting (A, C, Z, etc.).
Args:
channel: Main, Sub1, Sub2, or Sub3
"""
resp = await self._send_command(f"Frequency Weighting ({channel})?\r\n")
logger.info(f"Frequency weighting ({channel}) on {self.device_key}: {resp}")
return resp.strip()
async def set_frequency_weighting(self, weighting: str, channel: str = "Main"):
"""Set frequency weighting.
Args:
weighting: A, C, or Z
channel: Main, Sub1, Sub2, or Sub3
"""
await self._send_command(f"Frequency Weighting ({channel}),{weighting}\r\n")
logger.info(f"Frequency weighting ({channel}) set to {weighting} on {self.device_key}")
async def get_time_weighting(self, channel: str = "Main") -> str:
"""Get time weighting (F, S, I).
Args:
channel: Main, Sub1, Sub2, or Sub3
"""
resp = await self._send_command(f"Time Weighting ({channel})?\r\n")
logger.info(f"Time weighting ({channel}) on {self.device_key}: {resp}")
return resp.strip()
async def set_time_weighting(self, weighting: str, channel: str = "Main"):
"""Set time weighting.
Args:
weighting: F (Fast), S (Slow), or I (Impulse)
channel: Main, Sub1, Sub2, or Sub3
"""
await self._send_command(f"Time Weighting ({channel}),{weighting}\r\n")
logger.info(f"Time weighting ({channel}) set to {weighting} on {self.device_key}")
async def request_dlc(self) -> dict:
"""Request DLC (Data Last Calculation) - final stored measurement results.
This retrieves the complete calculation results from the last/current measurement,
including all statistical data. Similar to DOD but for final results.
Returns:
Dict with parsed DLC data
"""
resp = await self._send_command("DLC?\r\n")
logger.info(f"DLC data received from {self.device_key}: {resp[:100]}...")
# Parse DLC response - similar format to DOD
# The exact format depends on device configuration
# For now, return raw data - can be enhanced based on actual response format
return {
"raw_data": resp.strip(),
"device_key": self.device_key,
}
async def sleep(self):
"""Put the device into sleep mode to conserve battery.
Sleep mode is useful for battery conservation between scheduled measurements.
Device can be woken up remotely via TCP command or by pressing a button.
"""
await self._send_command("Sleep Mode,On\r\n")
logger.info(f"Device {self.device_key} entering sleep mode")
async def wake(self):
"""Wake the device from sleep mode.
Note: This may not work if the device is in deep sleep.
Physical button press might be required in some cases.
"""
await self._send_command("Sleep Mode,Off\r\n")
logger.info(f"Device {self.device_key} waking from sleep mode")
async def get_sleep_status(self) -> str:
"""Get the current sleep mode status."""
resp = await self._send_command("Sleep Mode?\r\n")
logger.info(f"Sleep mode status on {self.device_key}: {resp}")
return resp.strip()
async def stream_drd(self, callback):
"""Stream continuous DRD output from the device.
Opens a persistent connection and streams DRD data lines.
Calls the provided callback function with each parsed snapshot.
Args:
callback: Async function that receives NL43Snapshot objects
The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream.
"""
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
logger.info(f"DRD stream ended for {self.device_key}")
async def set_measurement_time(self, preset: str):
"""Set measurement time preset.
Args:
preset: Time preset (10s, 1m, 5m, 10m, 15m, 30m, 1h, 8h, 24h, or custom like "00:05:30")
"""
await self._send_command(f"Measurement Time Preset Manual,{preset}\r\n")
logger.info(f"Set measurement time to {preset} on {self.device_key}")
async def get_measurement_time(self) -> str:
"""Get current measurement time preset.
Returns: Current time preset setting
"""
resp = await self._send_command("Measurement Time Preset Manual?\r\n")
return resp.strip()
async def set_leq_interval(self, preset: str):
"""Set Leq calculation interval preset.
Args:
preset: Interval preset (Off, 10s, 1m, 5m, 10m, 15m, 30m, 1h, 8h, 24h, or custom like "00:05:30")
"""
await self._send_command(f"Leq Calculation Interval Preset,{preset}\r\n")
logger.info(f"Set Leq interval to {preset} on {self.device_key}")
async def get_leq_interval(self) -> str:
"""Get current Leq calculation interval preset.
Returns: Current interval preset setting
"""
resp = await self._send_command("Leq Calculation Interval Preset?\r\n")
return resp.strip()
async def set_lp_interval(self, preset: str):
"""Set Lp store interval.
Args:
preset: Store interval (Off, 10ms, 25ms, 100ms, 200ms, 1s)
"""
await self._send_command(f"Lp Store Interval,{preset}\r\n")
logger.info(f"Set Lp interval to {preset} on {self.device_key}")
async def get_lp_interval(self) -> str:
"""Get current Lp store interval.
Returns: Current store interval setting
"""
resp = await self._send_command("Lp Store Interval?\r\n")
return resp.strip()
async def set_index_number(self, index: int):
"""Set index number for file numbering (Store Name).
Args:
index: Index number (0000-9999)
"""
if not 0 <= index <= 9999:
raise ValueError("Index must be between 0000 and 9999")
await self._send_command(f"Store Name,{index:04d}\r\n")
logger.info(f"Set store name (index) to {index:04d} on {self.device_key}")
async def get_index_number(self) -> str:
"""Get current index number (Store Name).
Returns: Current index number
"""
resp = await self._send_command("Store Name?\r\n")
return resp.strip()
async def get_overwrite_status(self) -> str:
"""Check if saved data exists at current store target.
This command checks whether saved data exists in the set store target
(store mode / store name / store address). Use this before storing
to prevent accidentally overwriting data.
Returns:
"None" - No data exists (safe to store)
"Exist" - Data exists (would overwrite)
"""
resp = await self._send_command("Overwrite?\r\n")
return resp.strip()
async def get_all_settings(self) -> dict:
"""Query all device settings for verification.
Returns: Dictionary with all current device settings
"""
settings = {}
# Measurement settings
try:
settings["measurement_state"] = await self.get_measurement_state()
except Exception as e:
settings["measurement_state"] = f"Error: {e}"
try:
settings["frequency_weighting"] = await self.get_frequency_weighting()
except Exception as e:
settings["frequency_weighting"] = f"Error: {e}"
try:
settings["time_weighting"] = await self.get_time_weighting()
except Exception as e:
settings["time_weighting"] = f"Error: {e}"
# Timing/interval settings
try:
settings["measurement_time"] = await self.get_measurement_time()
except Exception as e:
settings["measurement_time"] = f"Error: {e}"
try:
settings["leq_interval"] = await self.get_leq_interval()
except Exception as e:
settings["leq_interval"] = f"Error: {e}"
try:
settings["lp_interval"] = await self.get_lp_interval()
except Exception as e:
settings["lp_interval"] = f"Error: {e}"
try:
settings["index_number"] = await self.get_index_number()
except Exception as e:
settings["index_number"] = f"Error: {e}"
# Device info
try:
settings["battery_level"] = await self.get_battery_level()
except Exception as e:
settings["battery_level"] = f"Error: {e}"
try:
settings["clock"] = await self.get_clock()
except Exception as e:
settings["clock"] = f"Error: {e}"
# Sleep mode
try:
settings["sleep_mode"] = await self.get_sleep_status()
except Exception as e:
settings["sleep_mode"] = f"Error: {e}"
# FTP status
try:
settings["ftp_status"] = await self.get_ftp_status()
except Exception as e:
settings["ftp_status"] = f"Error: {e}"
logger.info(f"Retrieved all settings for {self.device_key}")
return settings
async def enable_ftp(self):
"""Enable FTP server on the device.
According to NL43 protocol: FTP,On enables the FTP server
"""
await self._send_command("FTP,On\r\n")
logger.info(f"FTP enabled on {self.device_key}")
async def disable_ftp(self):
"""Disable FTP server on the device.
According to NL43 protocol: FTP,Off disables the FTP server
"""
await self._send_command("FTP,Off\r\n")
logger.info(f"FTP disabled on {self.device_key}")
async def get_ftp_status(self) -> str:
"""Query FTP server status on the device.
Returns: "On" or "Off"
"""
resp = await self._send_command("FTP?\r\n")
logger.info(f"FTP status on {self.device_key}: {resp}")
return resp.strip()
async def list_ftp_files(self, remote_path: str = "/") -> List[dict]:
"""List files on the device via FTP.
Args:
remote_path: Directory path on the device (default: root)
Returns:
List of file info dicts with 'name', 'size', 'modified', 'is_dir'
"""
logger.info(f"Listing FTP files on {self.device_key} at {remote_path}")
def _list_ftp_sync():
"""Synchronous FTP listing using ftplib (supports active mode)."""
ftp = FTP()
ftp.set_debuglevel(0)
try:
# Connect and login
ftp.connect(self.host, 21, timeout=10)
ftp.login(self.ftp_username, self.ftp_password)
ftp.set_pasv(False) # Force active mode
# Change to target directory
if remote_path != "/":
ftp.cwd(remote_path)
# Get directory listing with details
files = []
lines = []
ftp.retrlines('LIST', lines.append)
for line in lines:
# Parse Unix-style ls output
parts = line.split(None, 8)
if len(parts) < 9:
continue
is_dir = parts[0].startswith('d')
size = int(parts[4]) if not is_dir else 0
name = parts[8]
# Skip . and ..
if name in ('.', '..'):
continue
# Parse modification time
# Format: "Jan 07 14:23" or "Dec 25 2025"
modified_str = f"{parts[5]} {parts[6]} {parts[7]}"
modified_timestamp = None
try:
from datetime import datetime
# Try parsing with time (recent files: "Jan 07 14:23")
try:
dt = datetime.strptime(modified_str, "%b %d %H:%M")
# Add current year since it's not in the format
dt = dt.replace(year=datetime.now().year)
# If the resulting date is in the future, it's actually from last year
if dt > datetime.now():
dt = dt.replace(year=dt.year - 1)
modified_timestamp = dt.isoformat()
except ValueError:
# Try parsing with year (older files: "Dec 25 2025")
dt = datetime.strptime(modified_str, "%b %d %Y")
modified_timestamp = dt.isoformat()
except Exception as e:
logger.warning(f"Failed to parse timestamp '{modified_str}': {e}")
file_info = {
"name": name,
"path": f"{remote_path.rstrip('/')}/{name}",
"size": size,
"modified": modified_str, # Keep original string
"modified_timestamp": modified_timestamp, # Add parsed timestamp
"is_dir": is_dir,
}
files.append(file_info)
logger.debug(f"Found file: {file_info}")
logger.info(f"Found {len(files)} files/directories on {self.device_key}")
return files
finally:
try:
ftp.quit()
except:
pass
try:
# Run synchronous FTP in thread pool
return await asyncio.to_thread(_list_ftp_sync)
except Exception as e:
logger.error(f"Failed to list FTP files on {self.device_key}: {e}")
raise ConnectionError(f"FTP connection failed: {str(e)}")
async def download_ftp_file(self, remote_path: str, local_path: str):
"""Download a file from the device via FTP.
Args:
remote_path: Full path to file on the device
local_path: Local path where file will be saved
"""
logger.info(f"Downloading {remote_path} from {self.device_key} to {local_path}")
def _download_ftp_sync():
"""Synchronous FTP download using ftplib (supports active mode)."""
ftp = FTP()
ftp.set_debuglevel(0)
try:
# Connect and login
ftp.connect(self.host, 21, timeout=10)
ftp.login(self.ftp_username, self.ftp_password)
ftp.set_pasv(False) # Force active mode
# Download file
with open(local_path, 'wb') as f:
ftp.retrbinary(f'RETR {remote_path}', f.write)
logger.info(f"Successfully downloaded {remote_path} to {local_path}")
finally:
try:
ftp.quit()
except:
pass
try:
# Run synchronous FTP in thread pool
await asyncio.to_thread(_download_ftp_sync)
except Exception as e:
logger.error(f"Failed to download {remote_path} from {self.device_key}: {e}")
raise ConnectionError(f"FTP download failed: {str(e)}")