Compare commits

...

2 Commits

Author SHA1 Message Date
serversdwn
bf5f222511 Add:
- db cache dump on diagnostics request.
- individual device logs, db and files.
-Device logs api endpoints and diagnostics UI.

Fix:
- slmm standalone now uses local TZ (was UTC only before)
- fixed measurement start time logic.
2026-01-29 18:50:47 +00:00
serversdwn
eb39a9d1d0 add: device communication lock, Now to send a tcp command, slmm must establish a connection lock to prevent flooding unit.
fixed: Background poller intervals increased.
2026-01-29 07:54:49 +00:00
8 changed files with 1020 additions and 108 deletions

View File

@@ -15,7 +15,8 @@ from sqlalchemy.orm import Session
from app.database import SessionLocal from app.database import SessionLocal
from app.models import NL43Config, NL43Status from app.models import NL43Config, NL43Status
from app.services import NL43Client, persist_snapshot from app.services import NL43Client, persist_snapshot, sync_measurement_start_time_from_ftp
from app.device_logger import log_device_event, cleanup_old_logs
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -25,7 +26,7 @@ class BackgroundPoller:
Background task that continuously polls NL43 devices and updates status cache. Background task that continuously polls NL43 devices and updates status cache.
Features: Features:
- Per-device configurable poll intervals (10-3600 seconds) - Per-device configurable poll intervals (30 seconds to 6 hours)
- Automatic offline detection (marks unreachable after 3 consecutive failures) - Automatic offline detection (marks unreachable after 3 consecutive failures)
- Dynamic sleep intervals based on device configurations - Dynamic sleep intervals based on device configurations
- Graceful shutdown on application stop - Graceful shutdown on application stop
@@ -36,6 +37,7 @@ class BackgroundPoller:
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._running = False self._running = False
self._logger = logger self._logger = logger
self._last_cleanup = None # Track last log cleanup time
async def start(self): async def start(self):
"""Start the background polling task.""" """Start the background polling task."""
@@ -78,6 +80,15 @@ class BackgroundPoller:
except Exception as e: except Exception as e:
self._logger.error(f"Error in poll loop: {e}", exc_info=True) self._logger.error(f"Error in poll loop: {e}", exc_info=True)
# Run log cleanup once per hour
try:
now = datetime.utcnow()
if self._last_cleanup is None or (now - self._last_cleanup).total_seconds() > 3600:
cleanup_old_logs()
self._last_cleanup = now
except Exception as e:
self._logger.warning(f"Log cleanup failed: {e}")
# Calculate dynamic sleep interval # Calculate dynamic sleep interval
sleep_time = self._calculate_sleep_interval() sleep_time = self._calculate_sleep_interval()
self._logger.debug(f"Sleeping for {sleep_time} seconds until next poll cycle") self._logger.debug(f"Sleeping for {sleep_time} seconds until next poll cycle")
@@ -205,6 +216,71 @@ class BackgroundPoller:
db.commit() db.commit()
self._logger.info(f"✓ Successfully polled {unit_id}") self._logger.info(f"✓ Successfully polled {unit_id}")
# Log to device log
log_device_event(
unit_id, "INFO", "POLL",
f"Poll success: state={snap.measurement_state}, Leq={snap.leq}, Lp={snap.lp}",
db
)
# Check if device is measuring but has no start time recorded
# This happens if measurement was started before SLMM began polling
# or after a service restart
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
# Reset the sync flag when measurement stops (so next measurement can sync)
if status and status.measurement_state != "Start":
if status.start_time_sync_attempted:
status.start_time_sync_attempted = False
db.commit()
self._logger.debug(f"Reset FTP sync flag for {unit_id} (measurement stopped)")
log_device_event(unit_id, "DEBUG", "STATE", "Measurement stopped, reset FTP sync flag", db)
# Attempt FTP sync if:
# - Device is measuring
# - No start time recorded
# - FTP sync not already attempted for this measurement
# - FTP is configured
if (status and
status.measurement_state == "Start" and
status.measurement_start_time is None and
not status.start_time_sync_attempted and
cfg.ftp_enabled and
cfg.ftp_username and
cfg.ftp_password):
self._logger.info(
f"Device {unit_id} is measuring but has no start time - "
f"attempting FTP sync"
)
log_device_event(unit_id, "INFO", "SYNC", "Attempting FTP sync for measurement start time", db)
# Mark that we attempted sync (prevents repeated attempts on failure)
status.start_time_sync_attempted = True
db.commit()
try:
synced = await sync_measurement_start_time_from_ftp(
unit_id=unit_id,
host=cfg.host,
tcp_port=cfg.tcp_port,
ftp_port=cfg.ftp_port or 21,
ftp_username=cfg.ftp_username,
ftp_password=cfg.ftp_password,
db=db
)
if synced:
self._logger.info(f"✓ FTP sync succeeded for {unit_id}")
log_device_event(unit_id, "INFO", "SYNC", "FTP sync succeeded - measurement start time updated", db)
else:
self._logger.warning(f"FTP sync returned False for {unit_id}")
log_device_event(unit_id, "WARNING", "SYNC", "FTP sync returned False", db)
except Exception as sync_err:
self._logger.warning(
f"FTP sync failed for {unit_id}: {sync_err}"
)
log_device_event(unit_id, "ERROR", "SYNC", f"FTP sync failed: {sync_err}", db)
except Exception as e: except Exception as e:
# Failure - increment counter and potentially mark offline # Failure - increment counter and potentially mark offline
status.consecutive_failures += 1 status.consecutive_failures += 1
@@ -217,11 +293,13 @@ class BackgroundPoller:
self._logger.warning( self._logger.warning(
f"Device {unit_id} marked unreachable after {status.consecutive_failures} failures: {error_msg}" f"Device {unit_id} marked unreachable after {status.consecutive_failures} failures: {error_msg}"
) )
log_device_event(unit_id, "ERROR", "POLL", f"Device marked UNREACHABLE after {status.consecutive_failures} failures: {error_msg}", db)
status.is_reachable = False status.is_reachable = False
else: else:
self._logger.warning( self._logger.warning(
f"Poll failed for {unit_id} (attempt {status.consecutive_failures}/3): {error_msg}" f"Poll failed for {unit_id} (attempt {status.consecutive_failures}/3): {error_msg}"
) )
log_device_event(unit_id, "WARNING", "POLL", f"Poll failed (attempt {status.consecutive_failures}/3): {error_msg}", db)
db.commit() db.commit()
@@ -230,8 +308,8 @@ class BackgroundPoller:
Calculate the next sleep interval based on all device poll intervals. Calculate the next sleep interval based on all device poll intervals.
Returns a dynamic sleep time that ensures responsive polling: Returns a dynamic sleep time that ensures responsive polling:
- Minimum 10 seconds (prevents tight loops) - Minimum 30 seconds (prevents tight loops)
- Maximum 30 seconds (ensures responsiveness) - Maximum 300 seconds / 5 minutes (ensures reasonable responsiveness for long intervals)
- Generally half the minimum device interval - Generally half the minimum device interval
Returns: Returns:
@@ -245,14 +323,15 @@ class BackgroundPoller:
).all() ).all()
if not configs: if not configs:
return 30 # Default sleep when no devices configured return 60 # Default sleep when no devices configured
# Get all intervals # Get all intervals
intervals = [cfg.poll_interval_seconds or 60 for cfg in configs] intervals = [cfg.poll_interval_seconds or 60 for cfg in configs]
min_interval = min(intervals) min_interval = min(intervals)
# Use half the minimum interval, but cap between 10-30 seconds # Use half the minimum interval, but cap between 30-300 seconds
sleep_time = max(10, min(30, min_interval // 2)) # This allows longer sleep times when polling intervals are long (e.g., hourly)
sleep_time = max(30, min(300, min_interval // 2))
return sleep_time return sleep_time

277
app/device_logger.py Normal file
View File

@@ -0,0 +1,277 @@
"""
Per-device logging system.
Provides dual output: database entries for structured queries and file logs for backup.
Each device gets its own log file in data/logs/{unit_id}.log with rotation.
"""
import logging
import os
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models import DeviceLog
# Configure base logger
logger = logging.getLogger(__name__)
# Log directory (persisted in Docker volume)
LOG_DIR = Path(os.path.dirname(os.path.dirname(__file__))) / "data" / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
# Per-device file loggers (cached)
_device_file_loggers: dict = {}
# Log retention (days)
LOG_RETENTION_DAYS = int(os.getenv("LOG_RETENTION_DAYS", "7"))
def _get_file_logger(unit_id: str) -> logging.Logger:
"""Get or create a file logger for a specific device."""
if unit_id in _device_file_loggers:
return _device_file_loggers[unit_id]
# Create device-specific logger
device_logger = logging.getLogger(f"device.{unit_id}")
device_logger.setLevel(logging.DEBUG)
# Avoid duplicate handlers
if not device_logger.handlers:
# Create rotating file handler (5 MB max, keep 3 backups)
log_file = LOG_DIR / f"{unit_id}.log"
handler = RotatingFileHandler(
log_file,
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=3,
encoding="utf-8"
)
handler.setLevel(logging.DEBUG)
# Format: timestamp [LEVEL] [CATEGORY] message
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(category)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
device_logger.addHandler(handler)
# Don't propagate to root logger
device_logger.propagate = False
_device_file_loggers[unit_id] = device_logger
return device_logger
def log_device_event(
unit_id: str,
level: str,
category: str,
message: str,
db: Optional[Session] = None
):
"""
Log an event for a specific device.
Writes to both:
1. Database (DeviceLog table) for structured queries
2. File (data/logs/{unit_id}.log) for backup/debugging
Args:
unit_id: Device identifier
level: Log level (DEBUG, INFO, WARNING, ERROR)
category: Event category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
message: Log message
db: Optional database session (creates one if not provided)
"""
timestamp = datetime.utcnow()
# Write to file log
try:
file_logger = _get_file_logger(unit_id)
log_func = getattr(file_logger, level.lower(), file_logger.info)
# Pass category as extra for formatter
log_func(message, extra={"category": category})
except Exception as e:
logger.warning(f"Failed to write file log for {unit_id}: {e}")
# Write to database
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
log_entry = DeviceLog(
unit_id=unit_id,
timestamp=timestamp,
level=level.upper(),
category=category.upper(),
message=message
)
db.add(log_entry)
db.commit()
except Exception as e:
logger.warning(f"Failed to write DB log for {unit_id}: {e}")
if db:
db.rollback()
finally:
if close_db and db:
db.close()
def cleanup_old_logs(retention_days: Optional[int] = None, db: Optional[Session] = None):
"""
Delete log entries older than retention period.
Args:
retention_days: Days to retain (default: LOG_RETENTION_DAYS env var or 7)
db: Optional database session
"""
if retention_days is None:
retention_days = LOG_RETENTION_DAYS
cutoff = datetime.utcnow() - timedelta(days=retention_days)
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
deleted = db.query(DeviceLog).filter(DeviceLog.timestamp < cutoff).delete()
db.commit()
if deleted > 0:
logger.info(f"Cleaned up {deleted} log entries older than {retention_days} days")
except Exception as e:
logger.error(f"Failed to cleanup old logs: {e}")
if db:
db.rollback()
finally:
if close_db and db:
db.close()
def get_device_logs(
unit_id: str,
limit: int = 100,
offset: int = 0,
level: Optional[str] = None,
category: Optional[str] = None,
since: Optional[datetime] = None,
db: Optional[Session] = None
) -> list:
"""
Query log entries for a specific device.
Args:
unit_id: Device identifier
limit: Max entries to return (default: 100)
offset: Number of entries to skip (default: 0)
level: Filter by level (DEBUG, INFO, WARNING, ERROR)
category: Filter by category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
since: Filter entries after this timestamp
db: Optional database session
Returns:
List of log entries as dicts
"""
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
query = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id)
if level:
query = query.filter(DeviceLog.level == level.upper())
if category:
query = query.filter(DeviceLog.category == category.upper())
if since:
query = query.filter(DeviceLog.timestamp >= since)
# Order by newest first
query = query.order_by(DeviceLog.timestamp.desc())
# Apply pagination
entries = query.offset(offset).limit(limit).all()
return [
{
"id": e.id,
"timestamp": e.timestamp.isoformat() + "Z",
"level": e.level,
"category": e.category,
"message": e.message
}
for e in entries
]
finally:
if close_db and db:
db.close()
def get_log_stats(unit_id: str, db: Optional[Session] = None) -> dict:
"""
Get log statistics for a device.
Returns:
Dict with counts by level and category
"""
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
total = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id).count()
# Count by level
level_counts = {}
for level in ["DEBUG", "INFO", "WARNING", "ERROR"]:
count = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id,
DeviceLog.level == level
).count()
if count > 0:
level_counts[level] = count
# Count by category
category_counts = {}
for category in ["TCP", "FTP", "POLL", "COMMAND", "STATE", "SYNC", "GENERAL"]:
count = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id,
DeviceLog.category == category
).count()
if count > 0:
category_counts[category] = count
# Get oldest and newest
oldest = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id
).order_by(DeviceLog.timestamp.asc()).first()
newest = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id
).order_by(DeviceLog.timestamp.desc()).first()
return {
"total": total,
"by_level": level_counts,
"by_category": category_counts,
"oldest": oldest.timestamp.isoformat() + "Z" if oldest else None,
"newest": newest.timestamp.isoformat() + "Z" if newest else None
}
finally:
if close_db and db:
db.close()

View File

@@ -53,3 +53,22 @@ class NL43Status(Base):
last_poll_attempt = Column(DateTime, nullable=True) # Last time background poller attempted to poll last_poll_attempt = Column(DateTime, nullable=True) # Last time background poller attempted to poll
last_success = Column(DateTime, nullable=True) # Last successful poll timestamp last_success = Column(DateTime, nullable=True) # Last successful poll timestamp
last_error = Column(Text, nullable=True) # Last error message (truncated to 500 chars) last_error = Column(Text, nullable=True) # Last error message (truncated to 500 chars)
# FTP start time sync tracking
start_time_sync_attempted = Column(Boolean, default=False) # True if FTP sync was attempted for current measurement
class DeviceLog(Base):
"""
Per-device log entries for debugging and audit trail.
Stores events like commands, state changes, errors, and FTP operations.
"""
__tablename__ = "device_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
timestamp = Column(DateTime, default=func.now(), index=True)
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
message = Column(Text, nullable=False)

View File

@@ -3,6 +3,7 @@ from fastapi.responses import FileResponse
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from datetime import datetime from datetime import datetime
from pydantic import BaseModel, field_validator, Field from pydantic import BaseModel, field_validator, Field
from typing import Optional
import logging import logging
import ipaddress import ipaddress
import json import json
@@ -81,14 +82,14 @@ class ConfigPayload(BaseModel):
@field_validator("poll_interval_seconds") @field_validator("poll_interval_seconds")
@classmethod @classmethod
def validate_poll_interval(cls, v): def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600): if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 10 and 3600 seconds") raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v return v
class PollingConfigPayload(BaseModel): class PollingConfigPayload(BaseModel):
"""Payload for updating device polling configuration.""" """Payload for updating device polling configuration."""
poll_interval_seconds: int | None = Field(None, ge=10, le=3600, description="Polling interval in seconds (10-3600)") poll_interval_seconds: int | None = Field(None, ge=30, le=21600, description="Polling interval in seconds (30s to 6 hours)")
poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device") poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device")
@@ -233,8 +234,8 @@ class RosterCreatePayload(BaseModel):
@field_validator("poll_interval_seconds") @field_validator("poll_interval_seconds")
@classmethod @classmethod
def validate_poll_interval(cls, v): def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600): if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 10 and 3600 seconds") raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v return v
@@ -1842,9 +1843,134 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
# All tests passed # All tests passed
diagnostics["overall_status"] = "pass" diagnostics["overall_status"] = "pass"
# Add database dump: config and status cache
diagnostics["database_dump"] = {
"config": {
"unit_id": cfg.unit_id,
"host": cfg.host,
"tcp_port": cfg.tcp_port,
"tcp_enabled": cfg.tcp_enabled,
"ftp_enabled": cfg.ftp_enabled,
"ftp_port": cfg.ftp_port,
"ftp_username": cfg.ftp_username,
"ftp_password": "***" if cfg.ftp_password else None, # Mask password
"web_enabled": cfg.web_enabled,
"poll_interval_seconds": cfg.poll_interval_seconds,
"poll_enabled": cfg.poll_enabled
},
"status_cache": None
}
# Get cached status if available
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
if status:
# Helper to format datetime as ISO with Z suffix to indicate UTC
def to_utc_iso(dt):
return dt.isoformat() + 'Z' if dt else None
diagnostics["database_dump"]["status_cache"] = {
"unit_id": status.unit_id,
"last_seen": to_utc_iso(status.last_seen),
"measurement_state": status.measurement_state,
"measurement_start_time": to_utc_iso(status.measurement_start_time),
"counter": status.counter,
"lp": status.lp,
"leq": status.leq,
"lmax": status.lmax,
"lmin": status.lmin,
"lpeak": status.lpeak,
"battery_level": status.battery_level,
"power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb,
"sd_free_ratio": status.sd_free_ratio,
"is_reachable": status.is_reachable,
"consecutive_failures": status.consecutive_failures,
"last_poll_attempt": to_utc_iso(status.last_poll_attempt),
"last_success": to_utc_iso(status.last_success),
"last_error": status.last_error,
"raw_payload": status.raw_payload
}
return diagnostics return diagnostics
# ============================================================================
# DEVICE LOGS ENDPOINTS
# ============================================================================
@router.get("/{unit_id}/logs")
def get_device_logs(
unit_id: str,
limit: int = 100,
offset: int = 0,
level: Optional[str] = None,
category: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
Get log entries for a specific device.
Query parameters:
- limit: Max entries to return (default: 100, max: 1000)
- offset: Number of entries to skip (for pagination)
- level: Filter by level (DEBUG, INFO, WARNING, ERROR)
- category: Filter by category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
Returns newest entries first.
"""
from app.device_logger import get_device_logs as fetch_logs, get_log_stats
# Validate limit
limit = min(limit, 1000)
logs = fetch_logs(
unit_id=unit_id,
limit=limit,
offset=offset,
level=level,
category=category,
db=db
)
stats = get_log_stats(unit_id, db)
return {
"status": "ok",
"unit_id": unit_id,
"logs": logs,
"count": len(logs),
"stats": stats,
"filters": {
"level": level,
"category": category
},
"pagination": {
"limit": limit,
"offset": offset
}
}
@router.delete("/{unit_id}/logs")
def clear_device_logs(unit_id: str, db: Session = Depends(get_db)):
"""
Clear all log entries for a specific device.
"""
from app.models import DeviceLog
deleted = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id).delete()
db.commit()
logger.info(f"Cleared {deleted} log entries for device {unit_id}")
return {
"status": "ok",
"message": f"Cleared {deleted} log entries for {unit_id}",
"deleted_count": deleted
}
# ============================================================================ # ============================================================================
# BACKGROUND POLLING CONFIGURATION ENDPOINTS # BACKGROUND POLLING CONFIGURATION ENDPOINTS
# ============================================================================ # ============================================================================
@@ -1880,7 +2006,7 @@ def update_polling_config(
""" """
Update background polling configuration for a device. Update background polling configuration for a device.
Allows configuring the polling interval (10-3600 seconds) and Allows configuring the polling interval (30-21600 seconds, i.e. 30s to 6 hours) and
enabling/disabling automatic background polling per device. enabling/disabling automatic background polling per device.
Changes take effect on the next polling cycle. Changes take effect on the next polling cycle.
@@ -1891,10 +2017,15 @@ def update_polling_config(
# Update interval if provided # Update interval if provided
if payload.poll_interval_seconds is not None: if payload.poll_interval_seconds is not None:
if payload.poll_interval_seconds < 10: if payload.poll_interval_seconds < 30:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="Polling interval must be at least 10 seconds" detail="Polling interval must be at least 30 seconds"
)
if payload.poll_interval_seconds > 21600:
raise HTTPException(
status_code=400,
detail="Polling interval must be at most 21600 seconds (6 hours)"
) )
cfg.poll_interval_seconds = payload.poll_interval_seconds cfg.poll_interval_seconds = payload.poll_interval_seconds

View File

@@ -14,7 +14,7 @@ import zipfile
import tempfile import tempfile
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Optional, List from typing import Optional, List, Dict
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from ftplib import FTP from ftplib import FTP
from pathlib import Path from pathlib import Path
@@ -76,10 +76,22 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
# Measurement just started - record the start time # Measurement just started - record the start time
row.measurement_start_time = datetime.utcnow() row.measurement_start_time = datetime.utcnow()
logger.info(f"✓ Measurement started on {s.unit_id} at {row.measurement_start_time}") logger.info(f"✓ Measurement started on {s.unit_id} at {row.measurement_start_time}")
# Log state change (lazy import to avoid circular dependency)
try:
from app.device_logger import log_device_event
log_device_event(s.unit_id, "INFO", "STATE", f"Measurement STARTED at {row.measurement_start_time}", db)
except Exception:
pass
elif was_measuring and not is_measuring: elif was_measuring and not is_measuring:
# Measurement stopped - clear the start time # Measurement stopped - clear the start time
row.measurement_start_time = None row.measurement_start_time = None
logger.info(f"✓ Measurement stopped on {s.unit_id}") logger.info(f"✓ Measurement stopped on {s.unit_id}")
# Log state change
try:
from app.device_logger import log_device_event
log_device_event(s.unit_id, "INFO", "STATE", "Measurement STOPPED", db)
except Exception:
pass
row.measurement_state = new_state row.measurement_state = new_state
row.counter = s.counter row.counter = s.counter
@@ -101,10 +113,126 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
raise raise
async def sync_measurement_start_time_from_ftp(
unit_id: str,
host: str,
tcp_port: int,
ftp_port: int,
ftp_username: str,
ftp_password: str,
db: Session
) -> bool:
"""
Sync measurement start time from the FTP folder timestamp.
This is called when SLMM detects a device is already measuring but doesn't
have a recorded start time (e.g., after service restart or if measurement
was started before SLMM began polling).
The workflow:
1. Disable FTP (reset)
2. Enable FTP
3. List NL-43 folder to get measurement folder timestamps
4. Use the most recent folder's timestamp as the start time
5. Update the database
Args:
unit_id: Device identifier
host: Device IP/hostname
tcp_port: TCP control port
ftp_port: FTP port (usually 21)
ftp_username: FTP username (usually "USER")
ftp_password: FTP password (usually "0000")
db: Database session
Returns:
True if sync succeeded, False otherwise
"""
logger.info(f"[FTP-SYNC] Attempting to sync measurement start time for {unit_id} via FTP")
client = NL43Client(
host, tcp_port,
ftp_username=ftp_username,
ftp_password=ftp_password,
ftp_port=ftp_port
)
try:
# Step 1: Disable FTP to reset it
logger.info(f"[FTP-SYNC] Step 1: Disabling FTP on {unit_id}")
await client.disable_ftp()
await asyncio.sleep(1.5) # Wait for device to process
# Step 2: Enable FTP
logger.info(f"[FTP-SYNC] Step 2: Enabling FTP on {unit_id}")
await client.enable_ftp()
await asyncio.sleep(2.0) # Wait for FTP server to start
# Step 3: List NL-43 folder
logger.info(f"[FTP-SYNC] Step 3: Listing /NL-43 folder on {unit_id}")
files = await client.list_ftp_files("/NL-43")
# Filter for directories only (measurement folders)
folders = [f for f in files if f.get('is_dir', False)]
if not folders:
logger.warning(f"[FTP-SYNC] No measurement folders found on {unit_id}")
return False
# Sort by modified timestamp (newest first)
folders.sort(key=lambda f: f.get('modified_timestamp', ''), reverse=True)
latest_folder = folders[0]
folder_name = latest_folder['name']
logger.info(f"[FTP-SYNC] Found latest measurement folder: {folder_name}")
# Step 4: Parse timestamp
if 'modified_timestamp' in latest_folder and latest_folder['modified_timestamp']:
timestamp_str = latest_folder['modified_timestamp']
# Parse ISO format timestamp (already in UTC from SLMM FTP listing)
start_time = datetime.fromisoformat(timestamp_str.replace('Z', ''))
# Step 5: Update database
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
if status:
old_time = status.measurement_start_time
status.measurement_start_time = start_time
db.commit()
logger.info(f"[FTP-SYNC] ✓ Successfully synced start time for {unit_id}")
logger.info(f"[FTP-SYNC] Folder: {folder_name}")
logger.info(f"[FTP-SYNC] Old start time: {old_time}")
logger.info(f"[FTP-SYNC] New start time: {start_time}")
return True
else:
logger.warning(f"[FTP-SYNC] Status record not found for {unit_id}")
return False
else:
logger.warning(f"[FTP-SYNC] Could not parse timestamp from folder {folder_name}")
return False
except Exception as e:
logger.error(f"[FTP-SYNC] Failed to sync start time for {unit_id}: {e}")
return False
# Rate limiting: NL43 requires ≥1 second between commands # Rate limiting: NL43 requires ≥1 second between commands
_last_command_time = {} _last_command_time = {}
_rate_limit_lock = asyncio.Lock() _rate_limit_lock = asyncio.Lock()
# Per-device connection locks: NL43 devices only support one TCP connection at a time
# This prevents concurrent connections from fighting for the device
_device_locks: Dict[str, asyncio.Lock] = {}
_device_locks_lock = asyncio.Lock()
async def _get_device_lock(device_key: str) -> asyncio.Lock:
"""Get or create a lock for a specific device."""
async with _device_locks_lock:
if device_key not in _device_locks:
_device_locks[device_key] = asyncio.Lock()
return _device_locks[device_key]
class NL43Client: class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21): def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21):
@@ -133,7 +261,17 @@ class NL43Client:
NL43 protocol returns two lines for query commands: NL43 protocol returns two lines for query commands:
Line 1: Result code (R+0000 for success, error codes otherwise) Line 1: Result code (R+0000 for success, error codes otherwise)
Line 2: Actual data (for query commands ending with '?') Line 2: Actual data (for query commands ending with '?')
This method acquires a per-device lock to ensure only one TCP connection
is active at a time (NL43 devices only support single connections).
""" """
# Acquire per-device lock to prevent concurrent connections
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
return await self._send_command_unlocked(cmd)
async def _send_command_unlocked(self, cmd: str) -> str:
"""Internal: send command without acquiring device lock (lock must be held by caller)."""
await self._enforce_rate_limit() await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}") logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
@@ -429,105 +567,112 @@ class NL43Client:
The stream continues until an exception occurs or the connection is closed. The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream. Send SUB character (0x1A) to stop the stream.
NOTE: This method holds the device lock for the entire duration of streaming,
blocking other commands to this device. This is intentional since NL43 devices
only support one TCP connection at a time.
""" """
await self._enforce_rate_limit() # Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}") logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain()
# Read initial result code
first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
result_code = first_line_data.decode(errors="ignore").strip()
if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try: try:
writer.write(b"\x1A") reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
try:
# Start DRD streaming
writer.write(b"DRD?\r\n")
await writer.drain() await writer.drain()
except Exception:
pass
writer.close() # Read initial result code
with contextlib.suppress(Exception): first_line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
await writer.wait_closed() result_code = first_line_data.decode(errors="ignore").strip()
logger.info(f"DRD stream ended for {self.device_key}") if result_code.startswith("$"):
result_code = result_code[1:].strip()
logger.debug(f"DRD stream result code from {self.device_key}: {result_code}")
if result_code != "R+0000":
raise ValueError(f"DRD stream failed to start: {result_code}")
logger.info(f"DRD stream started successfully for {self.device_key}")
# Continuously read data lines
while True:
try:
line_data = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=30.0)
line = line_data.decode(errors="ignore").strip()
if not line:
continue
# Remove leading $ if present
if line.startswith("$"):
line = line[1:].strip()
# Parse the DRD data (same format as DOD)
parts = [p.strip() for p in line.split(",") if p.strip() != ""]
if len(parts) < 2:
logger.warning(f"Malformed DRD data from {self.device_key}: {line}")
continue
snap = NL43Snapshot(unit_id="", raw_payload=line, measurement_state="Measure")
# Parse known positions (DRD format - same as DOD)
# DRD format: d0=counter, d1=Lp, d2=Leq, d3=Lmax, d4=Lmin, d5=Lpeak, d6=LIeq, ...
try:
# Capture d0 (counter) for timer synchronization
if len(parts) >= 1:
snap.counter = parts[0] # d0: Measurement interval counter (1-600)
if len(parts) >= 2:
snap.lp = parts[1] # d1: Instantaneous sound pressure level
if len(parts) >= 3:
snap.leq = parts[2] # d2: Equivalent continuous sound level
if len(parts) >= 4:
snap.lmax = parts[3] # d3: Maximum level
if len(parts) >= 5:
snap.lmin = parts[4] # d4: Minimum level
if len(parts) >= 6:
snap.lpeak = parts[5] # d5: Peak level
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing DRD data points: {e}")
# Call the callback with the snapshot
await callback(snap)
except asyncio.TimeoutError:
logger.warning(f"DRD stream timeout (no data for 30s) from {self.device_key}")
break
except asyncio.IncompleteReadError:
logger.info(f"DRD stream closed by device {self.device_key}")
break
finally:
# Send SUB character to stop streaming
try:
writer.write(b"\x1A")
await writer.drain()
except Exception:
pass
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
logger.info(f"DRD stream ended for {self.device_key}")
async def set_measurement_time(self, preset: str): async def set_measurement_time(self, preset: str):
"""Set measurement time preset. """Set measurement time preset.

View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Database migration: Add device_logs table.
This table stores per-device log entries for debugging and audit trail.
Run this once to add the new table.
"""
import sqlite3
import os
# Path to the SLMM database
DB_PATH = os.path.join(os.path.dirname(__file__), "data", "slmm.db")
def migrate():
print(f"Adding device_logs table to: {DB_PATH}")
if not os.path.exists(DB_PATH):
print("Database does not exist yet. Table will be created automatically on first run.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# Check if table already exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='device_logs'
""")
if cursor.fetchone():
print("✓ device_logs table already exists, no migration needed")
return
# Create the table
print("Creating device_logs table...")
cursor.execute("""
CREATE TABLE device_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
unit_id VARCHAR NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
level VARCHAR DEFAULT 'INFO',
category VARCHAR DEFAULT 'GENERAL',
message TEXT NOT NULL
)
""")
# Create indexes for efficient querying
print("Creating indexes...")
cursor.execute("CREATE INDEX ix_device_logs_unit_id ON device_logs (unit_id)")
cursor.execute("CREATE INDEX ix_device_logs_timestamp ON device_logs (timestamp)")
conn.commit()
print("✓ Created device_logs table with indexes")
# Verify
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='device_logs'
""")
if not cursor.fetchone():
raise Exception("device_logs table was not created successfully")
print("✓ Migration completed successfully")
finally:
conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python3
"""
Database migration: Add start_time_sync_attempted field to nl43_status table.
This field tracks whether FTP sync has been attempted for the current measurement,
preventing repeated sync attempts when FTP fails.
Run this once to add the new column.
"""
import sqlite3
import os
# Path to the SLMM database
DB_PATH = os.path.join(os.path.dirname(__file__), "data", "slmm.db")
def migrate():
print(f"Adding start_time_sync_attempted field to: {DB_PATH}")
if not os.path.exists(DB_PATH):
print("Database does not exist yet. Column will be created automatically.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# Check if column already exists
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [col[1] for col in cursor.fetchall()]
if 'start_time_sync_attempted' in columns:
print("✓ start_time_sync_attempted column already exists, no migration needed")
return
# Add the column
print("Adding start_time_sync_attempted column...")
cursor.execute("""
ALTER TABLE nl43_status
ADD COLUMN start_time_sync_attempted BOOLEAN DEFAULT 0
""")
conn.commit()
print("✓ Added start_time_sync_attempted column")
# Verify
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [col[1] for col in cursor.fetchall()]
if 'start_time_sync_attempted' not in columns:
raise Exception("start_time_sync_attempted column was not added successfully")
print("✓ Migration completed successfully")
finally:
conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -333,6 +333,134 @@
html += `<p style="margin-top: 12px; font-size: 0.9em; color: #666;">Last run: ${new Date(data.timestamp).toLocaleString()}</p>`; html += `<p style="margin-top: 12px; font-size: 0.9em; color: #666;">Last run: ${new Date(data.timestamp).toLocaleString()}</p>`;
// Add database dump section if available
if (data.database_dump) {
html += `<div style="margin-top: 16px; border-top: 1px solid #d0d7de; padding-top: 12px;">`;
html += `<h4 style="margin: 0 0 12px 0;">📦 Database Dump</h4>`;
// Config section
if (data.database_dump.config) {
const cfg = data.database_dump.config;
html += `<div style="background: #f0f4f8; padding: 12px; border-radius: 4px; margin-bottom: 12px;">`;
html += `<strong>Configuration (nl43_config)</strong>`;
html += `<table style="width: 100%; margin-top: 8px; font-size: 0.9em;">`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Host</td><td>${cfg.host}:${cfg.tcp_port}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">TCP Enabled</td><td>${cfg.tcp_enabled ? '✓' : '✗'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">FTP Enabled</td><td>${cfg.ftp_enabled ? '✓' : '✗'}${cfg.ftp_enabled ? ` (port ${cfg.ftp_port}, user: ${cfg.ftp_username || 'none'})` : ''}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Background Polling</td><td>${cfg.poll_enabled ? `✓ every ${cfg.poll_interval_seconds}s` : '✗ disabled'}</td></tr>`;
html += `</table></div>`;
}
// Status cache section
if (data.database_dump.status_cache) {
const cache = data.database_dump.status_cache;
html += `<div style="background: #f0f8f4; padding: 12px; border-radius: 4px; margin-bottom: 12px;">`;
html += `<strong>Status Cache (nl43_status)</strong>`;
html += `<table style="width: 100%; margin-top: 8px; font-size: 0.9em;">`;
// Measurement state and timing
html += `<tr><td style="padding: 2px 8px; color: #666;">Measurement State</td><td><strong>${cache.measurement_state || 'unknown'}</strong></td></tr>`;
if (cache.measurement_start_time) {
const startTime = new Date(cache.measurement_start_time);
const elapsed = Math.floor((Date.now() - startTime) / 1000);
const elapsedStr = elapsed > 3600 ? `${Math.floor(elapsed/3600)}h ${Math.floor((elapsed%3600)/60)}m` : elapsed > 60 ? `${Math.floor(elapsed/60)}m ${elapsed%60}s` : `${elapsed}s`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Measurement Started</td><td>${startTime.toLocaleString()} (${elapsedStr} ago)</td></tr>`;
}
html += `<tr><td style="padding: 2px 8px; color: #666;">Counter (d0)</td><td>${cache.counter || 'N/A'}</td></tr>`;
// Sound levels
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Sound Levels (dB)</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lp (Instantaneous)</td><td>${cache.lp || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Leq (Equivalent)</td><td>${cache.leq || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lmax / Lmin</td><td>${cache.lmax || 'N/A'} / ${cache.lmin || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lpeak</td><td>${cache.lpeak || 'N/A'}</td></tr>`;
// Device status
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Device Status</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Battery</td><td>${cache.battery_level || 'N/A'}${cache.power_source ? ` (${cache.power_source})` : ''}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">SD Card</td><td>${cache.sd_remaining_mb ? `${cache.sd_remaining_mb} MB` : 'N/A'}${cache.sd_free_ratio ? ` (${cache.sd_free_ratio} free)` : ''}</td></tr>`;
// Polling status
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Polling Status</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Reachable</td><td>${cache.is_reachable ? '🟢 Yes' : '🔴 No'}</td></tr>`;
if (cache.last_seen) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Seen</td><td>${new Date(cache.last_seen).toLocaleString()}</td></tr>`;
}
if (cache.last_success) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Success</td><td>${new Date(cache.last_success).toLocaleString()}</td></tr>`;
}
if (cache.last_poll_attempt) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Poll Attempt</td><td>${new Date(cache.last_poll_attempt).toLocaleString()}</td></tr>`;
}
html += `<tr><td style="padding: 2px 8px; color: #666;">Consecutive Failures</td><td>${cache.consecutive_failures || 0}</td></tr>`;
if (cache.last_error) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Error</td><td style="color: #d00; font-size: 0.85em;">${cache.last_error}</td></tr>`;
}
html += `</table></div>`;
// Raw payload (collapsible)
if (cache.raw_payload) {
html += `<details style="margin-top: 8px;"><summary style="cursor: pointer; color: #666; font-size: 0.9em;">📄 Raw Payload</summary>`;
html += `<pre style="background: #f6f8fa; padding: 8px; border-radius: 4px; font-size: 0.8em; overflow-x: auto; margin-top: 8px;">${cache.raw_payload}</pre></details>`;
}
} else {
html += `<p style="color: #888; font-style: italic;">No cached status available for this unit.</p>`;
}
html += `</div>`;
}
// Fetch and display device logs
try {
const logsRes = await fetch(`/api/nl43/${unitId}/logs?limit=50`);
if (logsRes.ok) {
const logsData = await logsRes.json();
if (logsData.logs && logsData.logs.length > 0) {
html += `<div style="margin-top: 16px; border-top: 1px solid #d0d7de; padding-top: 12px;">`;
html += `<h4 style="margin: 0 0 12px 0;">📋 Device Logs (${logsData.stats.total} total)</h4>`;
// Stats summary
if (logsData.stats.by_level) {
html += `<div style="margin-bottom: 8px; font-size: 0.85em; color: #666;">`;
const levels = logsData.stats.by_level;
const parts = [];
if (levels.ERROR) parts.push(`<span style="color: #d00;">${levels.ERROR} errors</span>`);
if (levels.WARNING) parts.push(`<span style="color: #fa0;">${levels.WARNING} warnings</span>`);
if (levels.INFO) parts.push(`${levels.INFO} info`);
html += parts.join(' · ');
html += `</div>`;
}
// Log entries (collapsible)
html += `<details open><summary style="cursor: pointer; font-size: 0.9em; margin-bottom: 8px;">Recent entries (${logsData.logs.length})</summary>`;
html += `<div style="max-height: 300px; overflow-y: auto; background: #f6f8fa; border: 1px solid #d0d7de; border-radius: 4px; padding: 8px; font-size: 0.8em; font-family: monospace;">`;
logsData.logs.forEach(entry => {
const levelColor = {
'ERROR': '#d00',
'WARNING': '#b86e00',
'INFO': '#0969da',
'DEBUG': '#888'
}[entry.level] || '#666';
const time = new Date(entry.timestamp).toLocaleString();
html += `<div style="margin-bottom: 4px; border-bottom: 1px solid #eee; padding-bottom: 4px;">`;
html += `<span style="color: #888;">${time}</span> `;
html += `<span style="color: ${levelColor}; font-weight: 600;">[${entry.level}]</span> `;
html += `<span style="color: #666;">[${entry.category}]</span> `;
html += `${entry.message}`;
html += `</div>`;
});
html += `</div></details>`;
html += `</div>`;
}
}
} catch (logErr) {
console.log('Could not fetch device logs:', logErr);
}
resultsEl.innerHTML = html; resultsEl.innerHTML = html;
log(`Diagnostics complete: ${data.overall_status}`); log(`Diagnostics complete: ${data.overall_status}`);