Add FTP credentials management and UI enhancements

- Implement migration script to add ftp_username and ftp_password columns to nl43_config table.
- Create set_ftp_credentials.py script for updating FTP credentials in the database.
- Update requirements.txt to include aioftp for FTP functionality.
- Enhance index.html with FTP controls including enable, disable, check status, and list files features.
- Add JavaScript functions for handling FTP operations and displaying file lists.
This commit is contained in:
serversdwn
2025-12-24 02:03:03 +00:00
parent db6fd56673
commit 316cfa84f8
15 changed files with 1095 additions and 7 deletions

Binary file not shown.

View File

@@ -73,7 +73,7 @@ async def health_devices():
configs = db.query(NL43Config).filter_by(tcp_enabled=True).all()
for cfg in configs:
client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0)
client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
status = {
"unit_id": cfg.unit_id,
"host": cfg.host,

View File

@@ -14,6 +14,8 @@ class NL43Config(Base):
tcp_port = Column(Integer, default=80) # NL43 TCP control port (via RX55)
tcp_enabled = Column(Boolean, default=True)
ftp_enabled = Column(Boolean, default=False)
ftp_username = Column(String, nullable=True) # FTP login username
ftp_password = Column(String, nullable=True) # FTP login password
web_enabled = Column(Boolean, default=False)

View File

@@ -1,9 +1,12 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from datetime import datetime
from pydantic import BaseModel, field_validator
import logging
import ipaddress
import json
import os
from app.database import get_db
from app.models import NL43Config, NL43Status
@@ -19,6 +22,8 @@ class ConfigPayload(BaseModel):
tcp_port: int | None = None
tcp_enabled: bool | None = None
ftp_enabled: bool | None = None
ftp_username: str | None = None
ftp_password: str | None = None
web_enabled: bool | None = None
@field_validator("host")
@@ -81,6 +86,10 @@ def upsert_config(unit_id: str, payload: ConfigPayload, db: Session = Depends(ge
cfg.tcp_enabled = payload.tcp_enabled
if payload.ftp_enabled is not None:
cfg.ftp_enabled = payload.ftp_enabled
if payload.ftp_username is not None:
cfg.ftp_username = payload.ftp_username
if payload.ftp_password is not None:
cfg.ftp_password = payload.ftp_password
if payload.web_enabled is not None:
cfg.web_enabled = payload.web_enabled
@@ -182,7 +191,7 @@ async def start_measurement(unit_id: str, db: Session = Depends(get_db)):
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port)
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.start()
logger.info(f"Started measurement on unit {unit_id}")
@@ -207,7 +216,7 @@ async def stop_measurement(unit_id: str, db: Session = Depends(get_db)):
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port)
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.stop()
logger.info(f"Stopped measurement on unit {unit_id}")
@@ -223,6 +232,32 @@ async def stop_measurement(unit_id: str, db: Session = Depends(get_db)):
return {"status": "ok", "message": "Measurement stopped"}
@router.post("/{unit_id}/store")
async def manual_store(unit_id: str, db: Session = Depends(get_db)):
"""Manually store measurement data to SD card."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.manual_store()
logger.info(f"Manual store executed on unit {unit_id}")
return {"status": "ok", "message": "Data stored to SD card"}
except ConnectionError as e:
logger.error(f"Failed to store data on {unit_id}: {e}")
raise HTTPException(status_code=502, detail="Failed to communicate with device")
except TimeoutError:
logger.error(f"Timeout storing data on {unit_id}")
raise HTTPException(status_code=504, detail="Device communication timeout")
except Exception as e:
logger.error(f"Unexpected error storing data on {unit_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{unit_id}/live")
async def live_status(unit_id: str, db: Session = Depends(get_db)):
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
@@ -232,7 +267,7 @@ async def live_status(unit_id: str, db: Session = Depends(get_db)):
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port)
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
snap = await client.request_dod()
snap.unit_id = unit_id
@@ -255,3 +290,215 @@ async def live_status(unit_id: str, db: Session = Depends(get_db)):
except Exception as e:
logger.error(f"Unexpected error getting live status for {unit_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.websocket("/{unit_id}/stream")
async def stream_live(websocket: WebSocket, unit_id: str):
"""WebSocket endpoint for real-time DRD streaming from NL43 device.
Connects to the device, starts DRD streaming, and pushes updates to the WebSocket client.
The stream continues until the client disconnects or an error occurs.
"""
await websocket.accept()
logger.info(f"WebSocket connection accepted for unit {unit_id}")
from app.database import SessionLocal
db: Session = SessionLocal()
try:
# Get device configuration
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
await websocket.send_json({"error": "NL43 config not found", "unit_id": unit_id})
await websocket.close()
return
if not cfg.tcp_enabled:
await websocket.send_json(
{"error": "TCP communication is disabled for this device", "unit_id": unit_id}
)
await websocket.close()
return
# Create client and define callback
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
async def send_snapshot(snap):
"""Callback that sends each snapshot to the WebSocket client."""
snap.unit_id = unit_id
# Persist to database
try:
persist_snapshot(snap, db)
except Exception as e:
logger.error(f"Failed to persist snapshot during stream: {e}")
# Send to WebSocket client
try:
await websocket.send_json({
"unit_id": unit_id,
"timestamp": datetime.utcnow().isoformat(),
"measurement_state": snap.measurement_state,
"lp": snap.lp,
"leq": snap.leq,
"lmax": snap.lmax,
"lmin": snap.lmin,
"lpeak": snap.lpeak,
"raw_payload": snap.raw_payload,
})
except Exception as e:
logger.error(f"Failed to send snapshot via WebSocket: {e}")
raise
# Start DRD streaming
logger.info(f"Starting DRD stream for unit {unit_id}")
await client.stream_drd(send_snapshot)
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for unit {unit_id}")
except ConnectionError as e:
logger.error(f"Failed to connect to device {unit_id}: {e}")
try:
await websocket.send_json({"error": "Failed to communicate with device", "detail": str(e)})
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error in WebSocket stream for {unit_id}: {e}")
try:
await websocket.send_json({"error": "Internal server error", "detail": str(e)})
except Exception:
pass
finally:
db.close()
try:
await websocket.close()
except Exception:
pass
logger.info(f"WebSocket stream closed for unit {unit_id}")
@router.post("/{unit_id}/ftp/enable")
async def enable_ftp(unit_id: str, db: Session = Depends(get_db)):
"""Enable FTP server on the device."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.enable_ftp()
logger.info(f"Enabled FTP on unit {unit_id}")
return {"status": "ok", "message": "FTP enabled"}
except Exception as e:
logger.error(f"Failed to enable FTP on {unit_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to enable FTP: {str(e)}")
@router.post("/{unit_id}/ftp/disable")
async def disable_ftp(unit_id: str, db: Session = Depends(get_db)):
"""Disable FTP server on the device."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.disable_ftp()
logger.info(f"Disabled FTP on unit {unit_id}")
return {"status": "ok", "message": "FTP disabled"}
except Exception as e:
logger.error(f"Failed to disable FTP on {unit_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to disable FTP: {str(e)}")
@router.get("/{unit_id}/ftp/status")
async def get_ftp_status(unit_id: str, db: Session = Depends(get_db)):
"""Get FTP server status from the device."""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
if not cfg.tcp_enabled:
raise HTTPException(status_code=403, detail="TCP communication is disabled for this device")
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
status = await client.get_ftp_status()
return {"status": "ok", "ftp_enabled": status.lower() == "on", "ftp_status": status}
except Exception as e:
logger.error(f"Failed to get FTP status from {unit_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get FTP status: {str(e)}")
@router.get("/{unit_id}/ftp/files")
async def list_ftp_files(unit_id: str, path: str = "/", db: Session = Depends(get_db)):
"""List files on the device via FTP.
Query params:
path: Directory path on the device (default: root)
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
files = await client.list_ftp_files(path)
return {"status": "ok", "path": path, "files": files, "count": len(files)}
except ConnectionError as e:
logger.error(f"Failed to list FTP files on {unit_id}: {e}")
raise HTTPException(status_code=502, detail="Failed to communicate with device")
except Exception as e:
logger.error(f"Unexpected error listing FTP files on {unit_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
class DownloadRequest(BaseModel):
remote_path: str
@router.post("/{unit_id}/ftp/download")
async def download_ftp_file(unit_id: str, payload: DownloadRequest, db: Session = Depends(get_db)):
"""Download a file from the device via FTP.
The file is saved to data/downloads/{unit_id}/ and can be retrieved via the response.
"""
cfg = db.query(NL43Config).filter_by(unit_id=unit_id).first()
if not cfg:
raise HTTPException(status_code=404, detail="NL43 config not found")
# Create download directory
download_dir = f"data/downloads/{unit_id}"
os.makedirs(download_dir, exist_ok=True)
# Extract filename from remote path
filename = os.path.basename(payload.remote_path)
if not filename:
raise HTTPException(status_code=400, detail="Invalid remote path")
local_path = os.path.join(download_dir, filename)
client = NL43Client(cfg.host, cfg.tcp_port, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
try:
await client.download_ftp_file(payload.remote_path, local_path)
logger.info(f"Downloaded {payload.remote_path} from {unit_id} to {local_path}")
# Return the file
return FileResponse(
path=local_path,
filename=filename,
media_type="application/octet-stream",
)
except ConnectionError as e:
logger.error(f"Failed to download file from {unit_id}: {e}")
raise HTTPException(status_code=502, detail="Failed to communicate with device")
except Exception as e:
logger.error(f"Unexpected error downloading file from {unit_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -11,8 +11,9 @@ import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from typing import Optional, List
from sqlalchemy.orm import Session
import aioftp
from app.models import NL43Status
@@ -69,10 +70,12 @@ _rate_limit_lock = asyncio.Lock()
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0):
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None):
self.host = host
self.port = port
self.timeout = timeout
self.ftp_username = ftp_username or "anonymous"
self.ftp_password = ftp_password or ""
self.device_key = f"{host}:{port}"
async def _enforce_rate_limit(self):
@@ -215,3 +218,217 @@ class NL43Client:
According to NL43 protocol: Measure,Stop (no $ prefix, capitalized param)
"""
await self._send_command("Measure,Stop\r\n")
async def set_store_mode_manual(self):
"""Set the device to Manual Store mode.
According to NL43 protocol: Store Mode,Manual sets manual storage mode
"""
await self._send_command("Store Mode,Manual\r\n")
logger.info(f"Store mode set to Manual on {self.device_key}")
async def manual_store(self):
"""Manually store the current measurement data.
According to NL43 protocol: Manual Store,Start executes storing
Parameter p1="Start" executes the storage operation
Device must be in Manual Store mode first
"""
await self._send_command("Manual Store,Start\r\n")
logger.info(f"Manual store executed on {self.device_key}")
async def stream_drd(self, callback):
"""Stream continuous DRD output from the device.
Opens a persistent connection and streams DRD data lines.
Calls the provided callback function with each parsed snapshot.
Args:
callback: Async function that receives NL43Snapshot objects
The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream.
"""
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions
try:
if len(parts) >= 1:
snap.lp = parts[0]
if len(parts) >= 2:
snap.leq = parts[1]
if len(parts) >= 4:
snap.lmax = parts[3]
if len(parts) >= 5:
snap.lmin = parts[4]
if len(parts) >= 11:
snap.lpeak = parts[10]
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
logger.info(f"DRD stream ended for {self.device_key}")
async def enable_ftp(self):
"""Enable FTP server on the device.
According to NL43 protocol: FTP,On enables the FTP server
"""
await self._send_command("FTP,On\r\n")
logger.info(f"FTP enabled on {self.device_key}")
async def disable_ftp(self):
"""Disable FTP server on the device.
According to NL43 protocol: FTP,Off disables the FTP server
"""
await self._send_command("FTP,Off\r\n")
logger.info(f"FTP disabled on {self.device_key}")
async def get_ftp_status(self) -> str:
"""Query FTP server status on the device.
Returns: "On" or "Off"
"""
resp = await self._send_command("FTP?\r\n")
logger.info(f"FTP status on {self.device_key}: {resp}")
return resp.strip()
async def list_ftp_files(self, remote_path: str = "/") -> List[dict]:
"""List files on the device via FTP.
Args:
remote_path: Directory path on the device (default: root)
Returns:
List of file info dicts with 'name', 'size', 'modified', 'is_dir'
"""
logger.info(f"Listing FTP files on {self.device_key} at {remote_path}")
try:
# FTP uses standard port 21, not the TCP control port
async with aioftp.Client.context(
self.host,
port=21,
user=self.ftp_username,
password=self.ftp_password,
socket_timeout=10
) as client:
files = []
async for path, info in client.list(remote_path):
file_info = {
"name": path.name,
"path": str(path),
"size": info.get("size", 0),
"modified": info.get("modify", ""),
"is_dir": info["type"] == "dir",
}
files.append(file_info)
logger.debug(f"Found file: {file_info}")
logger.info(f"Found {len(files)} files/directories on {self.device_key}")
return files
except Exception as e:
logger.error(f"Failed to list FTP files on {self.device_key}: {e}")
raise ConnectionError(f"FTP connection failed: {str(e)}")
async def download_ftp_file(self, remote_path: str, local_path: str):
"""Download a file from the device via FTP.
Args:
remote_path: Full path to file on the device
local_path: Local path where file will be saved
"""
logger.info(f"Downloading {remote_path} from {self.device_key} to {local_path}")
try:
# FTP uses standard port 21, not the TCP control port
async with aioftp.Client.context(
self.host,
port=21,
user=self.ftp_username,
password=self.ftp_password,
socket_timeout=10
) as client:
await client.download(remote_path, local_path, write_into=True)
logger.info(f"Successfully downloaded {remote_path} to {local_path}")
except Exception as e:
logger.error(f"Failed to download {remote_path} from {self.device_key}: {e}")
raise ConnectionError(f"FTP download failed: {str(e)}")