Compare commits

14 Commits

Author SHA1 Message Date
ad1a40e0aa Merge pull request 'v0.3.0, persistent polling update. Persistent TCP connection pool with all features Connection pool diagnostics (API + UI) All 6 new environment variables Changes to health check, diagnostics, and DRD streaming Technical architecture details and cellular' (#2) from dev-persistent into main
Reviewed-on: #2
2026-02-16 21:57:37 -05:00
serversdwn
b62e84f8b3 v0.3.0, persistent polling update. 2026-02-17 02:56:11 +00:00
serversdwn
a5f8d1b2c7 Persistent polling interval increased. Healthcheck now uses poll instead of separate handshakes. 2026-02-17 02:41:09 +00:00
serversdwn
a1a80bbb4d add: new persisent connection approach, env variables for tcp keepalive and persist, added connection pool class. 2026-02-16 04:25:51 +00:00
serversdwn
005e0091fe fix: delay added to ensure tcp commands dont talk over eachother 2026-02-16 02:42:41 +00:00
serversdwn
e6ac80df6c chore: add pcap files to gitignore 2026-02-10 21:12:19 +00:00
serversdwn
7070b948a8 add: stress test script for diagnosing TCP connection issues.
chore: clean up .gitignore
2026-02-10 07:07:34 +00:00
serversdwn
3b6e9ad3f0 fix: time added to FTP enable step to prevent commands getting messed up 2026-02-06 17:37:10 +00:00
serversdwn
eb0cbcc077 fix: 24hr restart schedule enchanced.
Step 0: Pause polling
Step 1: Stop measurement → wait 10s
Step 2: Disable FTP → wait 10s
Step 3: Enable FTP → wait 10s
Step 4: Download data
Step 5: Wait 30s for device to settle
Step 6: Start new measurement
Step 7: Re-enable polling
2026-01-31 05:15:00 +00:00
serversdwn
cc0a5bdf84 chore cleanup 2026-01-29 22:44:20 +00:00
serversdwn
bf5f222511 Add:
- db cache dump on diagnostics request.
- individual device logs, db and files.
-Device logs api endpoints and diagnostics UI.

Fix:
- slmm standalone now uses local TZ (was UTC only before)
- fixed measurement start time logic.
2026-01-29 18:50:47 +00:00
serversdwn
eb39a9d1d0 add: device communication lock, Now to send a tcp command, slmm must establish a connection lock to prevent flooding unit.
fixed: Background poller intervals increased.
2026-01-29 07:54:49 +00:00
serversdwn
67d63b4173 Merge branch 'main' of ssh://10.0.0.2:2222/serversdown/slmm 2026-01-23 08:29:27 +00:00
serversdwn
25cf9528d0 docs: update to 0.2.1 2026-01-23 08:26:23 +00:00
15 changed files with 3447 additions and 418 deletions

4
.gitignore vendored
View File

@@ -1,5 +1,7 @@
/manuals/
/data/
/SLM-stress-test/stress_test_logs/
/SLM-stress-test/tcpdump-runs/
# Python cache
__pycache__/
@@ -12,3 +14,5 @@ __pycache__/
*.egg-info/
dist/
build/
*.pcap

View File

@@ -5,6 +5,70 @@ All notable changes to SLMM (Sound Level Meter Manager) will be documented in th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.3.0] - 2026-02-17
### Added
#### Persistent TCP Connection Pool
- **Connection reuse** - TCP connections are cached per device and reused across commands, eliminating repeated TCP handshakes over cellular modems
- **OS-level TCP keepalive** - Configurable keepalive probes keep cellular NAT tables alive and detect dead connections early (default: probe after 15s idle, every 10s, 3 failures = dead)
- **Transparent retry** - If a cached connection goes stale, the system automatically retries with a fresh connection so failures are never visible to the caller
- **Stale connection detection** - Multi-layer detection via idle TTL, max age, transport state, and reader EOF checks
- **Background cleanup** - Periodic task (every 30s) evicts expired connections from the pool
- **Master switch** - Set `TCP_PERSISTENT_ENABLED=false` to revert to per-request connection behavior
#### Connection Pool Diagnostics
- `GET /api/nl43/_connections/status` - View pool configuration, active connections, age/idle times, and keepalive settings
- `POST /api/nl43/_connections/flush` - Force-close all cached connections (useful for debugging)
- **Connections tab on roster page** - Live UI showing pool config, active connections with age/idle/alive status, auto-refreshes every 5s, and flush button
#### Environment Variables
- `TCP_PERSISTENT_ENABLED` (default: `true`) - Master switch for persistent connections
- `TCP_IDLE_TTL` (default: `300`) - Close idle connections after N seconds
- `TCP_MAX_AGE` (default: `1800`) - Force reconnect after N seconds
- `TCP_KEEPALIVE_IDLE` (default: `15`) - Seconds idle before keepalive probes start
- `TCP_KEEPALIVE_INTERVAL` (default: `10`) - Seconds between keepalive probes
- `TCP_KEEPALIVE_COUNT` (default: `3`) - Failed probes before declaring connection dead
### Changed
- **Health check endpoint** (`/health/devices`) - Now uses connection pool instead of opening throwaway TCP connections; checks for existing live connections first (zero-cost), only opens new connection through pool if needed
- **Diagnostics endpoint** - Removed separate port 443 modem check (extra handshake waste); TCP reachability test now uses connection pool
- **DRD streaming** - Streaming connections now get TCP keepalive options set; cached connections are evicted before opening dedicated streaming socket
- **Default timeouts tuned for cellular** - Idle TTL raised to 300s (5 min), max age raised to 1800s (30 min) to survive typical polling intervals over cellular links
### Technical Details
#### Architecture
- `ConnectionPool` class in `services.py` manages a single cached connection per device key (NL-43 only supports one TCP connection at a time)
- Uses existing per-device asyncio locks and rate limiting — no changes to concurrency model
- Pool is a module-level singleton initialized from environment variables at import time
- Lifecycle managed via FastAPI lifespan: cleanup task starts on startup, all connections closed on shutdown
- `_send_command_unlocked()` refactored to use acquire/release/discard pattern with single-retry fallback
- Command parsing extracted to `_execute_command()` method for reuse between primary and retry paths
#### Cellular Modem Optimizations
- Keepalive probes at 15s prevent cellular NAT tables from expiring (typically 30-60s timeout)
- 300s idle TTL ensures connections survive between polling cycles (default 60s interval)
- 1800s max age allows a single socket to serve ~30 minutes of polling before forced reconnect
- Health checks and diagnostics produce zero additional TCP handshakes when a pooled connection exists
- Stale `$` prompt bytes drained from idle connections before command reuse
### Breaking Changes
None. This release is fully backward-compatible with v0.2.x. Set `TCP_PERSISTENT_ENABLED=false` for identical behavior to previous versions.
---
## [0.2.1] - 2026-01-23
### Added
- **Roster management**: UI and API endpoints for managing device rosters.
- **Delete config endpoint**: Remove device configuration alongside cached status data.
- **Scheduler hooks**: `start_cycle` and `stop_cycle` helpers for Terra-View scheduling integration.
### Changed
- **FTP logging**: Connection, authentication, and transfer phases now log explicitly.
- **Documentation**: Reorganized docs/scripts and updated API notes for FTP/TCP verification.
## [0.2.0] - 2026-01-15
### Added
@@ -135,5 +199,7 @@ None. This release is fully backward-compatible with v0.1.x. All existing endpoi
## Version History Summary
- **v0.3.0** (2026-02-17) - Persistent TCP connections with keepalive for cellular modem reliability
- **v0.2.1** (2026-01-23) - Roster management, scheduler hooks, FTP logging, doc cleanup
- **v0.2.0** (2026-01-15) - Background Polling System
- **v0.1.0** (2025-12-XX) - Initial Release

View File

@@ -1,6 +1,6 @@
# SLMM - Sound Level Meter Manager
**Version 0.2.0**
**Version 0.3.0**
Backend API service for controlling and monitoring Rion NL-43/NL-53 Sound Level Meters via TCP and FTP protocols.
@@ -12,8 +12,9 @@ SLMM is a standalone backend module that provides REST API routing and command t
## Features
- **Background Polling** ⭐ NEW: Continuous automatic polling of devices with configurable intervals
- **Offline Detection** ⭐ NEW: Automatic device reachability tracking with failure counters
- **Persistent TCP Connections**: Cached per-device connections with OS-level keepalive, tuned for cellular modem reliability
- **Background Polling**: Continuous automatic polling of devices with configurable intervals
- **Offline Detection**: Automatic device reachability tracking with failure counters
- **Device Management**: Configure and manage multiple NL43/NL53 devices
- **Real-time Monitoring**: Stream live measurement data via WebSocket
- **Measurement Control**: Start, stop, pause, resume, and reset measurements
@@ -22,6 +23,7 @@ SLMM is a standalone backend module that provides REST API routing and command t
- **Device Configuration**: Manage frequency/time weighting, clock sync, and more
- **Rate Limiting**: Automatic 1-second delay enforcement between device commands
- **Persistent Storage**: SQLite database for device configs and measurement cache
- **Connection Diagnostics**: Live UI and API endpoints for monitoring TCP connection pool status
## Architecture
@@ -29,29 +31,39 @@ SLMM is a standalone backend module that provides REST API routing and command t
┌─────────────────┐ ┌──────────────────────────────┐ ┌─────────────────┐
│ │◄───────►│ SLMM API │◄───────►│ NL43/NL53 │
│ (Frontend) │ HTTP │ • REST Endpoints │ TCP │ Sound Meters │
└─────────────────┘ │ • WebSocket Streaming │ └─────────────────┘
│ • Background Poller ⭐ NEW │
└──────────────────────────────┘
│ Continuous
▼ Polling
┌──────────────┐ │
│ SQLite DB │◄─────────────────────
└─────────────────┘ │ • WebSocket Streaming │ (kept (via cellular │
│ • Background Poller │ alive) │ modem)
│ • Connection Pool (v0.3) │ └─────────────────┘
└──────────────────────────────┘
──────────────
│ SQLite DB │
│ • Config │
│ • Status │
└──────────────┘
```
### Persistent TCP Connection Pool (v0.3.0)
SLMM maintains persistent TCP connections to devices with OS-level keepalive, designed for reliable operation over cellular modems:
- **Connection Reuse**: One cached TCP socket per device, reused across all commands (no repeated handshakes)
- **TCP Keepalive**: Probes keep cellular NAT tables alive and detect dead connections early
- **Transparent Retry**: Stale cached connections automatically retry with a fresh socket
- **Configurable**: Idle TTL (300s), max age (1800s), and keepalive timing via environment variables
- **Diagnostics**: Live UI on the roster page and API endpoints for monitoring pool status
### Background Polling (v0.2.0)
SLMM now includes a background polling service that continuously queries devices and updates the status cache:
Background polling service continuously queries devices and updates the status cache:
- **Automatic Updates**: Devices are polled at configurable intervals (10-3600 seconds)
- **Offline Detection**: Devices marked unreachable after 3 consecutive failures
- **Per-Device Configuration**: Each device can have a custom polling interval
- **Resource Efficient**: Dynamic sleep intervals and smart scheduling
- **Graceful Shutdown**: Background task stops cleanly on service shutdown
This makes Terra-View significantly more responsive - status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds).
Status requests return cached data instantly (<100ms) instead of waiting for device queries (1-2 seconds).
## Quick Start
@@ -96,9 +108,18 @@ Once running, visit:
### Environment Variables
**Server:**
- `PORT`: Server port (default: 8100)
- `CORS_ORIGINS`: Comma-separated list of allowed origins (default: "*")
**TCP Connection Pool:**
- `TCP_PERSISTENT_ENABLED`: Enable persistent connections (default: "true")
- `TCP_IDLE_TTL`: Close idle connections after N seconds (default: 300)
- `TCP_MAX_AGE`: Force reconnect after N seconds (default: 1800)
- `TCP_KEEPALIVE_IDLE`: Seconds idle before keepalive probes (default: 15)
- `TCP_KEEPALIVE_INTERVAL`: Seconds between keepalive probes (default: 10)
- `TCP_KEEPALIVE_COUNT`: Failed probes before declaring dead (default: 3)
### Database
The SQLite database is automatically created at [data/slmm.db](data/slmm.db) on first run.
@@ -126,7 +147,7 @@ Logs are written to:
| GET | `/api/nl43/{unit_id}/live` | Request fresh DOD data from device (bypasses cache) |
| WS | `/api/nl43/{unit_id}/stream` | WebSocket stream for real-time DRD data |
### Background Polling Configuration ⭐ NEW
### Background Polling
| Method | Endpoint | Description |
|--------|----------|-------------|
@@ -134,6 +155,13 @@ Logs are written to:
| PUT | `/api/nl43/{unit_id}/polling/config` | Update polling interval and enable/disable polling |
| GET | `/api/nl43/_polling/status` | Get global polling status for all devices |
### Connection Pool
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/nl43/_connections/status` | Get pool config, active connections, age/idle times |
| POST | `/api/nl43/_connections/flush` | Force-close all cached TCP connections |
### Measurement Control
| Method | Endpoint | Description |
@@ -255,6 +283,9 @@ Caches latest measurement snapshot:
### TCP Communication
- Uses ASCII command protocol over TCP
- Persistent connections with OS-level keepalive (tuned for cellular modems)
- Connections cached per device and reused across commands
- Transparent retry on stale connections
- Enforces ≥1 second delay between commands to same device
- Two-line response format:
- Line 1: Result code (R+0000 for success)
@@ -320,6 +351,16 @@ curl http://localhost:8100/api/nl43/meter-001/polling/config
curl http://localhost:8100/api/nl43/_polling/status
```
### Check Connection Pool Status
```bash
curl http://localhost:8100/api/nl43/_connections/status | jq '.'
```
### Flush All Cached Connections
```bash
curl -X POST http://localhost:8100/api/nl43/_connections/flush
```
### Verify Device Settings
```bash
curl http://localhost:8100/api/nl43/meter-001/settings
@@ -388,11 +429,19 @@ See [API.md](API.md) for detailed integration examples.
## Troubleshooting
### Connection Issues
- Check connection pool status: `curl http://localhost:8100/api/nl43/_connections/status`
- Flush stale connections: `curl -X POST http://localhost:8100/api/nl43/_connections/flush`
- Verify device IP address and port in configuration
- Ensure device is on the same network
- Check firewall rules allow TCP/FTP connections
- Verify RX55 network adapter is properly configured on device
### Cellular Modem Issues
- If modem wedges from too many handshakes, ensure `TCP_PERSISTENT_ENABLED=true` (default)
- Increase `TCP_IDLE_TTL` if connections expire between poll cycles
- Keepalive probes (default: every 15s) keep NAT tables alive — adjust `TCP_KEEPALIVE_IDLE` if needed
- Set `TCP_PERSISTENT_ENABLED=false` to disable pooling for debugging
### Rate Limiting
- API automatically enforces 1-second delay between commands
- If experiencing delays, this is normal device behavior

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,8 @@ from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models import NL43Config, NL43Status
from app.services import NL43Client, persist_snapshot
from app.services import NL43Client, persist_snapshot, sync_measurement_start_time_from_ftp
from app.device_logger import log_device_event, cleanup_old_logs
logger = logging.getLogger(__name__)
@@ -25,7 +26,7 @@ class BackgroundPoller:
Background task that continuously polls NL43 devices and updates status cache.
Features:
- Per-device configurable poll intervals (10-3600 seconds)
- Per-device configurable poll intervals (30 seconds to 6 hours)
- Automatic offline detection (marks unreachable after 3 consecutive failures)
- Dynamic sleep intervals based on device configurations
- Graceful shutdown on application stop
@@ -36,6 +37,7 @@ class BackgroundPoller:
self._task: Optional[asyncio.Task] = None
self._running = False
self._logger = logger
self._last_cleanup = None # Track last log cleanup time
async def start(self):
"""Start the background polling task."""
@@ -78,6 +80,15 @@ class BackgroundPoller:
except Exception as e:
self._logger.error(f"Error in poll loop: {e}", exc_info=True)
# Run log cleanup once per hour
try:
now = datetime.utcnow()
if self._last_cleanup is None or (now - self._last_cleanup).total_seconds() > 3600:
cleanup_old_logs()
self._last_cleanup = now
except Exception as e:
self._logger.warning(f"Log cleanup failed: {e}")
# Calculate dynamic sleep interval
sleep_time = self._calculate_sleep_interval()
self._logger.debug(f"Sleeping for {sleep_time} seconds until next poll cycle")
@@ -205,6 +216,71 @@ class BackgroundPoller:
db.commit()
self._logger.info(f"✓ Successfully polled {unit_id}")
# Log to device log
log_device_event(
unit_id, "INFO", "POLL",
f"Poll success: state={snap.measurement_state}, Leq={snap.leq}, Lp={snap.lp}",
db
)
# Check if device is measuring but has no start time recorded
# This happens if measurement was started before SLMM began polling
# or after a service restart
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
# Reset the sync flag when measurement stops (so next measurement can sync)
if status and status.measurement_state != "Start":
if status.start_time_sync_attempted:
status.start_time_sync_attempted = False
db.commit()
self._logger.debug(f"Reset FTP sync flag for {unit_id} (measurement stopped)")
log_device_event(unit_id, "DEBUG", "STATE", "Measurement stopped, reset FTP sync flag", db)
# Attempt FTP sync if:
# - Device is measuring
# - No start time recorded
# - FTP sync not already attempted for this measurement
# - FTP is configured
if (status and
status.measurement_state == "Start" and
status.measurement_start_time is None and
not status.start_time_sync_attempted and
cfg.ftp_enabled and
cfg.ftp_username and
cfg.ftp_password):
self._logger.info(
f"Device {unit_id} is measuring but has no start time - "
f"attempting FTP sync"
)
log_device_event(unit_id, "INFO", "SYNC", "Attempting FTP sync for measurement start time", db)
# Mark that we attempted sync (prevents repeated attempts on failure)
status.start_time_sync_attempted = True
db.commit()
try:
synced = await sync_measurement_start_time_from_ftp(
unit_id=unit_id,
host=cfg.host,
tcp_port=cfg.tcp_port,
ftp_port=cfg.ftp_port or 21,
ftp_username=cfg.ftp_username,
ftp_password=cfg.ftp_password,
db=db
)
if synced:
self._logger.info(f"✓ FTP sync succeeded for {unit_id}")
log_device_event(unit_id, "INFO", "SYNC", "FTP sync succeeded - measurement start time updated", db)
else:
self._logger.warning(f"FTP sync returned False for {unit_id}")
log_device_event(unit_id, "WARNING", "SYNC", "FTP sync returned False", db)
except Exception as sync_err:
self._logger.warning(
f"FTP sync failed for {unit_id}: {sync_err}"
)
log_device_event(unit_id, "ERROR", "SYNC", f"FTP sync failed: {sync_err}", db)
except Exception as e:
# Failure - increment counter and potentially mark offline
status.consecutive_failures += 1
@@ -217,11 +293,13 @@ class BackgroundPoller:
self._logger.warning(
f"Device {unit_id} marked unreachable after {status.consecutive_failures} failures: {error_msg}"
)
log_device_event(unit_id, "ERROR", "POLL", f"Device marked UNREACHABLE after {status.consecutive_failures} failures: {error_msg}", db)
status.is_reachable = False
else:
self._logger.warning(
f"Poll failed for {unit_id} (attempt {status.consecutive_failures}/3): {error_msg}"
)
log_device_event(unit_id, "WARNING", "POLL", f"Poll failed (attempt {status.consecutive_failures}/3): {error_msg}", db)
db.commit()
@@ -230,8 +308,8 @@ class BackgroundPoller:
Calculate the next sleep interval based on all device poll intervals.
Returns a dynamic sleep time that ensures responsive polling:
- Minimum 10 seconds (prevents tight loops)
- Maximum 30 seconds (ensures responsiveness)
- Minimum 30 seconds (prevents tight loops)
- Maximum 300 seconds / 5 minutes (ensures reasonable responsiveness for long intervals)
- Generally half the minimum device interval
Returns:
@@ -245,14 +323,15 @@ class BackgroundPoller:
).all()
if not configs:
return 30 # Default sleep when no devices configured
return 60 # Default sleep when no devices configured
# Get all intervals
intervals = [cfg.poll_interval_seconds or 60 for cfg in configs]
min_interval = min(intervals)
# Use half the minimum interval, but cap between 10-30 seconds
sleep_time = max(10, min(30, min_interval // 2))
# Use half the minimum interval, but cap between 30-300 seconds
# This allows longer sleep times when polling intervals are long (e.g., hourly)
sleep_time = max(30, min(300, min_interval // 2))
return sleep_time

277
app/device_logger.py Normal file
View File

@@ -0,0 +1,277 @@
"""
Per-device logging system.
Provides dual output: database entries for structured queries and file logs for backup.
Each device gets its own log file in data/logs/{unit_id}.log with rotation.
"""
import logging
import os
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models import DeviceLog
# Configure base logger
logger = logging.getLogger(__name__)
# Log directory (persisted in Docker volume)
LOG_DIR = Path(os.path.dirname(os.path.dirname(__file__))) / "data" / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
# Per-device file loggers (cached)
_device_file_loggers: dict = {}
# Log retention (days)
LOG_RETENTION_DAYS = int(os.getenv("LOG_RETENTION_DAYS", "7"))
def _get_file_logger(unit_id: str) -> logging.Logger:
"""Get or create a file logger for a specific device."""
if unit_id in _device_file_loggers:
return _device_file_loggers[unit_id]
# Create device-specific logger
device_logger = logging.getLogger(f"device.{unit_id}")
device_logger.setLevel(logging.DEBUG)
# Avoid duplicate handlers
if not device_logger.handlers:
# Create rotating file handler (5 MB max, keep 3 backups)
log_file = LOG_DIR / f"{unit_id}.log"
handler = RotatingFileHandler(
log_file,
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=3,
encoding="utf-8"
)
handler.setLevel(logging.DEBUG)
# Format: timestamp [LEVEL] [CATEGORY] message
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(category)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
device_logger.addHandler(handler)
# Don't propagate to root logger
device_logger.propagate = False
_device_file_loggers[unit_id] = device_logger
return device_logger
def log_device_event(
unit_id: str,
level: str,
category: str,
message: str,
db: Optional[Session] = None
):
"""
Log an event for a specific device.
Writes to both:
1. Database (DeviceLog table) for structured queries
2. File (data/logs/{unit_id}.log) for backup/debugging
Args:
unit_id: Device identifier
level: Log level (DEBUG, INFO, WARNING, ERROR)
category: Event category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
message: Log message
db: Optional database session (creates one if not provided)
"""
timestamp = datetime.utcnow()
# Write to file log
try:
file_logger = _get_file_logger(unit_id)
log_func = getattr(file_logger, level.lower(), file_logger.info)
# Pass category as extra for formatter
log_func(message, extra={"category": category})
except Exception as e:
logger.warning(f"Failed to write file log for {unit_id}: {e}")
# Write to database
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
log_entry = DeviceLog(
unit_id=unit_id,
timestamp=timestamp,
level=level.upper(),
category=category.upper(),
message=message
)
db.add(log_entry)
db.commit()
except Exception as e:
logger.warning(f"Failed to write DB log for {unit_id}: {e}")
if db:
db.rollback()
finally:
if close_db and db:
db.close()
def cleanup_old_logs(retention_days: Optional[int] = None, db: Optional[Session] = None):
"""
Delete log entries older than retention period.
Args:
retention_days: Days to retain (default: LOG_RETENTION_DAYS env var or 7)
db: Optional database session
"""
if retention_days is None:
retention_days = LOG_RETENTION_DAYS
cutoff = datetime.utcnow() - timedelta(days=retention_days)
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
deleted = db.query(DeviceLog).filter(DeviceLog.timestamp < cutoff).delete()
db.commit()
if deleted > 0:
logger.info(f"Cleaned up {deleted} log entries older than {retention_days} days")
except Exception as e:
logger.error(f"Failed to cleanup old logs: {e}")
if db:
db.rollback()
finally:
if close_db and db:
db.close()
def get_device_logs(
unit_id: str,
limit: int = 100,
offset: int = 0,
level: Optional[str] = None,
category: Optional[str] = None,
since: Optional[datetime] = None,
db: Optional[Session] = None
) -> list:
"""
Query log entries for a specific device.
Args:
unit_id: Device identifier
limit: Max entries to return (default: 100)
offset: Number of entries to skip (default: 0)
level: Filter by level (DEBUG, INFO, WARNING, ERROR)
category: Filter by category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
since: Filter entries after this timestamp
db: Optional database session
Returns:
List of log entries as dicts
"""
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
query = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id)
if level:
query = query.filter(DeviceLog.level == level.upper())
if category:
query = query.filter(DeviceLog.category == category.upper())
if since:
query = query.filter(DeviceLog.timestamp >= since)
# Order by newest first
query = query.order_by(DeviceLog.timestamp.desc())
# Apply pagination
entries = query.offset(offset).limit(limit).all()
return [
{
"id": e.id,
"timestamp": e.timestamp.isoformat() + "Z",
"level": e.level,
"category": e.category,
"message": e.message
}
for e in entries
]
finally:
if close_db and db:
db.close()
def get_log_stats(unit_id: str, db: Optional[Session] = None) -> dict:
"""
Get log statistics for a device.
Returns:
Dict with counts by level and category
"""
close_db = False
try:
if db is None:
db = SessionLocal()
close_db = True
total = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id).count()
# Count by level
level_counts = {}
for level in ["DEBUG", "INFO", "WARNING", "ERROR"]:
count = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id,
DeviceLog.level == level
).count()
if count > 0:
level_counts[level] = count
# Count by category
category_counts = {}
for category in ["TCP", "FTP", "POLL", "COMMAND", "STATE", "SYNC", "GENERAL"]:
count = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id,
DeviceLog.category == category
).count()
if count > 0:
category_counts[category] = count
# Get oldest and newest
oldest = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id
).order_by(DeviceLog.timestamp.asc()).first()
newest = db.query(DeviceLog).filter(
DeviceLog.unit_id == unit_id
).order_by(DeviceLog.timestamp.desc()).first()
return {
"total": total,
"by_level": level_counts,
"by_category": category_counts,
"oldest": oldest.timestamp.isoformat() + "Z" if oldest else None,
"newest": newest.timestamp.isoformat() + "Z" if newest else None
}
finally:
if close_db and db:
db.close()

View File

@@ -29,7 +29,11 @@ logger.info("Database tables initialized")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle - startup and shutdown events."""
from app.services import _connection_pool
# Startup
logger.info("Starting TCP connection pool cleanup task...")
_connection_pool.start_cleanup()
logger.info("Starting background poller...")
await poller.start()
logger.info("Background poller started")
@@ -40,12 +44,15 @@ async def lifespan(app: FastAPI):
logger.info("Stopping background poller...")
await poller.stop()
logger.info("Background poller stopped")
logger.info("Closing TCP connection pool...")
await _connection_pool.close_all()
logger.info("TCP connection pool closed")
app = FastAPI(
title="SLMM NL43 Addon",
description="Standalone module for NL43 configuration and status APIs with background polling",
version="0.2.0",
version="0.3.0",
lifespan=lifespan,
)
@@ -85,10 +92,14 @@ async def health():
@app.get("/health/devices")
async def health_devices():
"""Enhanced health check that tests device connectivity."""
"""Enhanced health check that tests device connectivity.
Uses the connection pool to avoid unnecessary TCP handshakes — if a
cached connection exists and is alive, the device is reachable.
"""
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.services import NL43Client
from app.services import _connection_pool
from app.models import NL43Config
db: Session = SessionLocal()
@@ -98,7 +109,7 @@ async def health_devices():
configs = db.query(NL43Config).filter_by(tcp_enabled=True).all()
for cfg in configs:
client = NL43Client(cfg.host, cfg.tcp_port, timeout=2.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
device_key = f"{cfg.host}:{cfg.tcp_port}"
status = {
"unit_id": cfg.unit_id,
"host": cfg.host,
@@ -108,14 +119,22 @@ async def health_devices():
}
try:
# Try to connect (don't send command to avoid rate limiting issues)
import asyncio
reader, writer = await asyncio.wait_for(
asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=2.0
)
writer.close()
await writer.wait_closed()
# Check if pool already has a live connection (zero-cost check)
pool_stats = _connection_pool.get_stats()
conn_info = pool_stats["connections"].get(device_key)
if conn_info and conn_info["alive"]:
status["reachable"] = True
status["source"] = "pool"
else:
# No cached connection — do a lightweight acquire/release
# This opens a connection if needed but keeps it in the pool
import asyncio
reader, writer, from_cache = await _connection_pool.acquire(
device_key, cfg.host, cfg.tcp_port, timeout=2.0
)
await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
status["reachable"] = True
status["source"] = "cached" if from_cache else "new"
except Exception as e:
status["error"] = str(type(e).__name__)
logger.warning(f"Device {cfg.unit_id} health check failed: {e}")

View File

@@ -53,3 +53,22 @@ class NL43Status(Base):
last_poll_attempt = Column(DateTime, nullable=True) # Last time background poller attempted to poll
last_success = Column(DateTime, nullable=True) # Last successful poll timestamp
last_error = Column(Text, nullable=True) # Last error message (truncated to 500 chars)
# FTP start time sync tracking
start_time_sync_attempted = Column(Boolean, default=False) # True if FTP sync was attempted for current measurement
class DeviceLog(Base):
"""
Per-device log entries for debugging and audit trail.
Stores events like commands, state changes, errors, and FTP operations.
"""
__tablename__ = "device_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
unit_id = Column(String, index=True, nullable=False)
timestamp = Column(DateTime, default=func.now(), index=True)
level = Column(String, default="INFO") # DEBUG, INFO, WARNING, ERROR
category = Column(String, default="GENERAL") # TCP, FTP, POLL, COMMAND, STATE, SYNC
message = Column(Text, nullable=False)

View File

@@ -3,6 +3,7 @@ from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from datetime import datetime
from pydantic import BaseModel, field_validator, Field
from typing import Optional
import logging
import ipaddress
import json
@@ -81,17 +82,45 @@ class ConfigPayload(BaseModel):
@field_validator("poll_interval_seconds")
@classmethod
def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600):
raise ValueError("Poll interval must be between 10 and 3600 seconds")
if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v
class PollingConfigPayload(BaseModel):
"""Payload for updating device polling configuration."""
poll_interval_seconds: int | None = Field(None, ge=10, le=3600, description="Polling interval in seconds (10-3600)")
poll_interval_seconds: int | None = Field(None, ge=30, le=21600, description="Polling interval in seconds (30s to 6 hours)")
poll_enabled: bool | None = Field(None, description="Enable or disable background polling for this device")
# ============================================================================
# TCP CONNECTION POOL ENDPOINTS (must be before /{unit_id} routes)
# ============================================================================
@router.get("/_connections/status")
async def get_connection_pool_status():
"""Get status of the persistent TCP connection pool.
Returns information about cached connections, keepalive settings,
and per-device connection age/idle times.
"""
from app.services import _connection_pool
return {"status": "ok", "pool": _connection_pool.get_stats()}
@router.post("/_connections/flush")
async def flush_connection_pool():
"""Close all cached TCP connections.
Useful for debugging or forcing fresh connections to all devices.
"""
from app.services import _connection_pool
await _connection_pool.close_all()
# Restart cleanup task since close_all cancels it
_connection_pool.start_cleanup()
return {"status": "ok", "message": "All cached connections closed"}
# ============================================================================
# GLOBAL POLLING STATUS ENDPOINT (must be before /{unit_id} routes)
# ============================================================================
@@ -233,8 +262,8 @@ class RosterCreatePayload(BaseModel):
@field_validator("poll_interval_seconds")
@classmethod
def validate_poll_interval(cls, v):
if v is not None and not (10 <= v <= 3600):
raise ValueError("Poll interval must be between 10 and 3600 seconds")
if v is not None and not (30 <= v <= 21600):
raise ValueError("Poll interval must be between 30 and 21600 seconds (30s to 6 hours)")
return v
@@ -544,12 +573,6 @@ async def stop_measurement(unit_id: str, db: Session = Depends(get_db)):
try:
await client.stop()
logger.info(f"Stopped measurement on unit {unit_id}")
# Query device status to update database with "Stop" state
snap = await client.request_dod()
snap.unit_id = unit_id
persist_snapshot(snap, db)
except ConnectionError as e:
logger.error(f"Failed to stop measurement on {unit_id}: {e}")
raise HTTPException(status_code=502, detail="Failed to communicate with device")
@@ -559,6 +582,15 @@ async def stop_measurement(unit_id: str, db: Session = Depends(get_db)):
except Exception as e:
logger.error(f"Unexpected error stopping measurement on {unit_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Query device status to update database — non-fatal if this fails
try:
snap = await client.request_dod()
snap.unit_id = unit_id
persist_snapshot(snap, db)
except Exception as e:
logger.warning(f"Stop succeeded but failed to update status for {unit_id}: {e}")
return {"status": "ok", "message": "Measurement stopped"}
@@ -656,8 +688,9 @@ async def stop_cycle(unit_id: str, payload: StopCyclePayload = None, db: Session
return {"status": "ok", "unit_id": unit_id, **result}
except Exception as e:
logger.error(f"Stop cycle failed for {unit_id}: {e}")
raise HTTPException(status_code=502, detail=str(e))
error_msg = str(e) if str(e) else f"{type(e).__name__}: No details available"
logger.error(f"Stop cycle failed for {unit_id}: {error_msg}")
raise HTTPException(status_code=502, detail=error_msg)
@router.post("/{unit_id}/store")
@@ -1722,74 +1755,38 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
"message": "TCP communication enabled"
}
# Test 3: Modem/Router reachable (check port 443 HTTPS)
# Test 3: TCP connection reachable (device port) — uses connection pool
# This avoids extra TCP handshakes over cellular. If a cached connection
# exists and is alive, we skip the handshake entirely.
from app.services import _connection_pool
device_key = f"{cfg.host}:{cfg.tcp_port}"
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(cfg.host, 443), timeout=3.0
)
writer.close()
await writer.wait_closed()
diagnostics["tests"]["modem_reachable"] = {
pool_stats = _connection_pool.get_stats()
conn_info = pool_stats["connections"].get(device_key)
if conn_info and conn_info["alive"]:
# Pool already has a live connection — device is reachable
diagnostics["tests"]["tcp_connection"] = {
"status": "pass",
"message": f"Modem/router reachable at {cfg.host}"
"message": f"TCP connection alive in pool for {cfg.host}:{cfg.tcp_port}"
}
except asyncio.TimeoutError:
diagnostics["tests"]["modem_reachable"] = {
"status": "fail",
"message": f"Modem/router timeout at {cfg.host} (network issue)"
}
diagnostics["overall_status"] = "fail"
return diagnostics
except ConnectionRefusedError:
# Connection refused means host is up but port 443 closed - that's ok
diagnostics["tests"]["modem_reachable"] = {
"status": "pass",
"message": f"Modem/router reachable at {cfg.host} (HTTPS closed)"
}
except Exception as e:
diagnostics["tests"]["modem_reachable"] = {
"status": "fail",
"message": f"Cannot reach modem/router at {cfg.host}: {str(e)}"
}
diagnostics["overall_status"] = "fail"
return diagnostics
# Test 4: TCP connection reachable (device port)
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(cfg.host, cfg.tcp_port), timeout=3.0
else:
# Acquire through the pool (opens new if needed, keeps it cached)
reader, writer, from_cache = await _connection_pool.acquire(
device_key, cfg.host, cfg.tcp_port, timeout=3.0
)
writer.close()
await writer.wait_closed()
await _connection_pool.release(device_key, reader, writer, cfg.host, cfg.tcp_port)
diagnostics["tests"]["tcp_connection"] = {
"status": "pass",
"message": f"TCP connection successful to {cfg.host}:{cfg.tcp_port}"
}
except asyncio.TimeoutError:
diagnostics["tests"]["tcp_connection"] = {
"status": "fail",
"message": f"Connection timeout to {cfg.host}:{cfg.tcp_port}"
}
diagnostics["overall_status"] = "fail"
return diagnostics
except ConnectionRefusedError:
diagnostics["tests"]["tcp_connection"] = {
"status": "fail",
"message": f"Connection refused by {cfg.host}:{cfg.tcp_port}"
}
diagnostics["overall_status"] = "fail"
return diagnostics
except Exception as e:
diagnostics["tests"]["tcp_connection"] = {
"status": "fail",
"message": f"Connection error: {str(e)}"
"message": f"Connection error to {cfg.host}:{cfg.tcp_port}: {str(e)}"
}
diagnostics["overall_status"] = "fail"
return diagnostics
# Wait a bit after connection test to let device settle
await asyncio.sleep(1.5)
# Test 5: Device responds to commands
# Use longer timeout to account for rate limiting (device requires ≥1s between commands)
client = NL43Client(cfg.host, cfg.tcp_port, timeout=10.0, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password)
@@ -1842,9 +1839,134 @@ async def run_diagnostics(unit_id: str, db: Session = Depends(get_db)):
# All tests passed
diagnostics["overall_status"] = "pass"
# Add database dump: config and status cache
diagnostics["database_dump"] = {
"config": {
"unit_id": cfg.unit_id,
"host": cfg.host,
"tcp_port": cfg.tcp_port,
"tcp_enabled": cfg.tcp_enabled,
"ftp_enabled": cfg.ftp_enabled,
"ftp_port": cfg.ftp_port,
"ftp_username": cfg.ftp_username,
"ftp_password": "***" if cfg.ftp_password else None, # Mask password
"web_enabled": cfg.web_enabled,
"poll_interval_seconds": cfg.poll_interval_seconds,
"poll_enabled": cfg.poll_enabled
},
"status_cache": None
}
# Get cached status if available
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
if status:
# Helper to format datetime as ISO with Z suffix to indicate UTC
def to_utc_iso(dt):
return dt.isoformat() + 'Z' if dt else None
diagnostics["database_dump"]["status_cache"] = {
"unit_id": status.unit_id,
"last_seen": to_utc_iso(status.last_seen),
"measurement_state": status.measurement_state,
"measurement_start_time": to_utc_iso(status.measurement_start_time),
"counter": status.counter,
"lp": status.lp,
"leq": status.leq,
"lmax": status.lmax,
"lmin": status.lmin,
"lpeak": status.lpeak,
"battery_level": status.battery_level,
"power_source": status.power_source,
"sd_remaining_mb": status.sd_remaining_mb,
"sd_free_ratio": status.sd_free_ratio,
"is_reachable": status.is_reachable,
"consecutive_failures": status.consecutive_failures,
"last_poll_attempt": to_utc_iso(status.last_poll_attempt),
"last_success": to_utc_iso(status.last_success),
"last_error": status.last_error,
"raw_payload": status.raw_payload
}
return diagnostics
# ============================================================================
# DEVICE LOGS ENDPOINTS
# ============================================================================
@router.get("/{unit_id}/logs")
def get_device_logs(
unit_id: str,
limit: int = 100,
offset: int = 0,
level: Optional[str] = None,
category: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
Get log entries for a specific device.
Query parameters:
- limit: Max entries to return (default: 100, max: 1000)
- offset: Number of entries to skip (for pagination)
- level: Filter by level (DEBUG, INFO, WARNING, ERROR)
- category: Filter by category (TCP, FTP, POLL, COMMAND, STATE, SYNC)
Returns newest entries first.
"""
from app.device_logger import get_device_logs as fetch_logs, get_log_stats
# Validate limit
limit = min(limit, 1000)
logs = fetch_logs(
unit_id=unit_id,
limit=limit,
offset=offset,
level=level,
category=category,
db=db
)
stats = get_log_stats(unit_id, db)
return {
"status": "ok",
"unit_id": unit_id,
"logs": logs,
"count": len(logs),
"stats": stats,
"filters": {
"level": level,
"category": category
},
"pagination": {
"limit": limit,
"offset": offset
}
}
@router.delete("/{unit_id}/logs")
def clear_device_logs(unit_id: str, db: Session = Depends(get_db)):
"""
Clear all log entries for a specific device.
"""
from app.models import DeviceLog
deleted = db.query(DeviceLog).filter(DeviceLog.unit_id == unit_id).delete()
db.commit()
logger.info(f"Cleared {deleted} log entries for device {unit_id}")
return {
"status": "ok",
"message": f"Cleared {deleted} log entries for {unit_id}",
"deleted_count": deleted
}
# ============================================================================
# BACKGROUND POLLING CONFIGURATION ENDPOINTS
# ============================================================================
@@ -1880,7 +2002,7 @@ def update_polling_config(
"""
Update background polling configuration for a device.
Allows configuring the polling interval (10-3600 seconds) and
Allows configuring the polling interval (30-21600 seconds, i.e. 30s to 6 hours) and
enabling/disabling automatic background polling per device.
Changes take effect on the next polling cycle.
@@ -1891,10 +2013,15 @@ def update_polling_config(
# Update interval if provided
if payload.poll_interval_seconds is not None:
if payload.poll_interval_seconds < 10:
if payload.poll_interval_seconds < 30:
raise HTTPException(
status_code=400,
detail="Polling interval must be at least 10 seconds"
detail="Polling interval must be at least 30 seconds"
)
if payload.poll_interval_seconds > 21600:
raise HTTPException(
status_code=400,
detail="Polling interval must be at most 21600 seconds (6 hours)"
)
cfg.poll_interval_seconds = payload.poll_interval_seconds

View File

@@ -1,20 +1,22 @@
"""
NL43 TCP connector and snapshot persistence.
Implements simple per-request TCP calls to avoid long-lived socket complexity.
Extend to pooled connections/DRD streaming later.
Implements persistent per-device TCP connections with OS-level keepalive
to reduce handshake overhead and survive cellular modem NAT timeouts.
Falls back to per-request connections on error with transparent retry.
"""
import asyncio
import contextlib
import logging
import socket
import time
import os
import zipfile
import tempfile
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from typing import Optional, List
from typing import Optional, List, Dict, Tuple
from sqlalchemy.orm import Session
from ftplib import FTP
from pathlib import Path
@@ -76,10 +78,22 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
# Measurement just started - record the start time
row.measurement_start_time = datetime.utcnow()
logger.info(f"✓ Measurement started on {s.unit_id} at {row.measurement_start_time}")
# Log state change (lazy import to avoid circular dependency)
try:
from app.device_logger import log_device_event
log_device_event(s.unit_id, "INFO", "STATE", f"Measurement STARTED at {row.measurement_start_time}", db)
except Exception:
pass
elif was_measuring and not is_measuring:
# Measurement stopped - clear the start time
row.measurement_start_time = None
logger.info(f"✓ Measurement stopped on {s.unit_id}")
# Log state change
try:
from app.device_logger import log_device_event
log_device_event(s.unit_id, "INFO", "STATE", "Measurement STOPPED", db)
except Exception:
pass
row.measurement_state = new_state
row.counter = s.counter
@@ -101,10 +115,413 @@ def persist_snapshot(s: NL43Snapshot, db: Session):
raise
async def sync_measurement_start_time_from_ftp(
unit_id: str,
host: str,
tcp_port: int,
ftp_port: int,
ftp_username: str,
ftp_password: str,
db: Session
) -> bool:
"""
Sync measurement start time from the FTP folder timestamp.
This is called when SLMM detects a device is already measuring but doesn't
have a recorded start time (e.g., after service restart or if measurement
was started before SLMM began polling).
The workflow:
1. Disable FTP (reset)
2. Enable FTP
3. List NL-43 folder to get measurement folder timestamps
4. Use the most recent folder's timestamp as the start time
5. Update the database
Args:
unit_id: Device identifier
host: Device IP/hostname
tcp_port: TCP control port
ftp_port: FTP port (usually 21)
ftp_username: FTP username (usually "USER")
ftp_password: FTP password (usually "0000")
db: Database session
Returns:
True if sync succeeded, False otherwise
"""
logger.info(f"[FTP-SYNC] Attempting to sync measurement start time for {unit_id} via FTP")
client = NL43Client(
host, tcp_port,
ftp_username=ftp_username,
ftp_password=ftp_password,
ftp_port=ftp_port
)
try:
# Step 1: Disable FTP to reset it
logger.info(f"[FTP-SYNC] Step 1: Disabling FTP on {unit_id}")
await client.disable_ftp()
await asyncio.sleep(1.5) # Wait for device to process
# Step 2: Enable FTP
logger.info(f"[FTP-SYNC] Step 2: Enabling FTP on {unit_id}")
await client.enable_ftp()
await asyncio.sleep(2.0) # Wait for FTP server to start
# Step 3: List NL-43 folder
logger.info(f"[FTP-SYNC] Step 3: Listing /NL-43 folder on {unit_id}")
files = await client.list_ftp_files("/NL-43")
# Filter for directories only (measurement folders)
folders = [f for f in files if f.get('is_dir', False)]
if not folders:
logger.warning(f"[FTP-SYNC] No measurement folders found on {unit_id}")
return False
# Sort by modified timestamp (newest first)
folders.sort(key=lambda f: f.get('modified_timestamp', ''), reverse=True)
latest_folder = folders[0]
folder_name = latest_folder['name']
logger.info(f"[FTP-SYNC] Found latest measurement folder: {folder_name}")
# Step 4: Parse timestamp
if 'modified_timestamp' in latest_folder and latest_folder['modified_timestamp']:
timestamp_str = latest_folder['modified_timestamp']
# Parse ISO format timestamp (already in UTC from SLMM FTP listing)
start_time = datetime.fromisoformat(timestamp_str.replace('Z', ''))
# Step 5: Update database
status = db.query(NL43Status).filter_by(unit_id=unit_id).first()
if status:
old_time = status.measurement_start_time
status.measurement_start_time = start_time
db.commit()
logger.info(f"[FTP-SYNC] ✓ Successfully synced start time for {unit_id}")
logger.info(f"[FTP-SYNC] Folder: {folder_name}")
logger.info(f"[FTP-SYNC] Old start time: {old_time}")
logger.info(f"[FTP-SYNC] New start time: {start_time}")
return True
else:
logger.warning(f"[FTP-SYNC] Status record not found for {unit_id}")
return False
else:
logger.warning(f"[FTP-SYNC] Could not parse timestamp from folder {folder_name}")
return False
except Exception as e:
logger.error(f"[FTP-SYNC] Failed to sync start time for {unit_id}: {e}")
return False
# Rate limiting: NL43 requires ≥1 second between commands
_last_command_time = {}
_rate_limit_lock = asyncio.Lock()
# Per-device connection locks: NL43 devices only support one TCP connection at a time
# This prevents concurrent connections from fighting for the device
_device_locks: Dict[str, asyncio.Lock] = {}
_device_locks_lock = asyncio.Lock()
async def _get_device_lock(device_key: str) -> asyncio.Lock:
"""Get or create a lock for a specific device."""
async with _device_locks_lock:
if device_key not in _device_locks:
_device_locks[device_key] = asyncio.Lock()
return _device_locks[device_key]
# ---------------------------------------------------------------------------
# Persistent TCP connection pool with OS-level keepalive
# ---------------------------------------------------------------------------
# Configuration via environment variables
TCP_PERSISTENT_ENABLED = os.getenv("TCP_PERSISTENT_ENABLED", "true").lower() == "true"
TCP_IDLE_TTL = float(os.getenv("TCP_IDLE_TTL", "300")) # Close idle connections after N seconds
TCP_MAX_AGE = float(os.getenv("TCP_MAX_AGE", "1800")) # Force reconnect after N seconds
TCP_KEEPALIVE_IDLE = int(os.getenv("TCP_KEEPALIVE_IDLE", "15")) # Seconds idle before probes
TCP_KEEPALIVE_INTERVAL = int(os.getenv("TCP_KEEPALIVE_INTERVAL", "10")) # Seconds between probes
TCP_KEEPALIVE_COUNT = int(os.getenv("TCP_KEEPALIVE_COUNT", "3")) # Failed probes before dead
logger.info(
f"TCP connection pool: persistent={TCP_PERSISTENT_ENABLED}, "
f"idle_ttl={TCP_IDLE_TTL}s, max_age={TCP_MAX_AGE}s, "
f"keepalive_idle={TCP_KEEPALIVE_IDLE}s, keepalive_interval={TCP_KEEPALIVE_INTERVAL}s, "
f"keepalive_count={TCP_KEEPALIVE_COUNT}"
)
@dataclass
class DeviceConnection:
"""Tracks a cached TCP connection and its metadata."""
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
device_key: str
host: str
port: int
created_at: float = field(default_factory=time.time)
last_used_at: float = field(default_factory=time.time)
class ConnectionPool:
"""Per-device persistent TCP connection cache with OS-level keepalive.
Each NL-43 device supports only one TCP connection at a time. This pool
caches that single connection per device key and reuses it across commands,
avoiding repeated TCP handshakes over high-latency cellular links.
Keepalive probes keep cellular NAT tables alive and detect dead connections
before the next command attempt.
"""
def __init__(
self,
enable_persistent: bool = True,
idle_ttl: float = 120.0,
max_age: float = 300.0,
keepalive_idle: int = 15,
keepalive_interval: int = 10,
keepalive_count: int = 3,
):
self._connections: Dict[str, DeviceConnection] = {}
self._lock = asyncio.Lock()
self._enable_persistent = enable_persistent
self._idle_ttl = idle_ttl
self._max_age = max_age
self._keepalive_idle = keepalive_idle
self._keepalive_interval = keepalive_interval
self._keepalive_count = keepalive_count
self._cleanup_task: Optional[asyncio.Task] = None
# -- lifecycle ----------------------------------------------------------
def start_cleanup(self):
"""Start background task that evicts stale connections."""
if self._enable_persistent and self._cleanup_task is None:
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("Connection pool cleanup task started")
async def close_all(self):
"""Close all cached connections (called at shutdown)."""
if self._cleanup_task is not None:
self._cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cleanup_task
self._cleanup_task = None
async with self._lock:
for key, conn in list(self._connections.items()):
await self._close_connection(conn, reason="shutdown")
self._connections.clear()
logger.info("Connection pool: all connections closed")
# -- public API ---------------------------------------------------------
async def acquire(
self, device_key: str, host: str, port: int, timeout: float
) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter, bool]:
"""Get a connection for a device (cached or fresh).
Returns:
(reader, writer, from_cache) — from_cache is True if reused.
"""
if self._enable_persistent:
async with self._lock:
conn = self._connections.pop(device_key, None)
if conn is not None:
if self._is_alive(conn):
self._drain_buffer(conn.reader)
conn.last_used_at = time.time()
logger.debug(f"Pool hit for {device_key} (age={time.time() - conn.created_at:.0f}s)")
return conn.reader, conn.writer, True
else:
await self._close_connection(conn, reason="stale")
# Open fresh connection
reader, writer = await self._open_connection(host, port, timeout)
logger.debug(f"New connection opened for {device_key}")
return reader, writer, False
async def release(self, device_key: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int):
"""Return a connection to the pool for reuse."""
if not self._enable_persistent:
self._close_writer(writer)
return
# Check transport is still healthy before caching
if writer.transport.is_closing() or reader.at_eof():
self._close_writer(writer)
return
conn = DeviceConnection(
reader=reader,
writer=writer,
device_key=device_key,
host=host,
port=port,
)
async with self._lock:
# Evict any existing connection for this device (shouldn't happen
# under normal locking, but be safe)
old = self._connections.pop(device_key, None)
if old is not None:
await self._close_connection(old, reason="replaced")
self._connections[device_key] = conn
async def discard(self, device_key: str):
"""Close and remove a connection from the pool (called on errors)."""
async with self._lock:
conn = self._connections.pop(device_key, None)
if conn is not None:
await self._close_connection(conn, reason="discarded")
logger.debug(f"Pool discard for {device_key}")
def get_stats(self) -> dict:
"""Return pool status for diagnostics."""
now = time.time()
connections = {}
for key, conn in self._connections.items():
connections[key] = {
"host": conn.host,
"port": conn.port,
"age_seconds": round(now - conn.created_at, 1),
"idle_seconds": round(now - conn.last_used_at, 1),
"alive": self._is_alive(conn),
}
return {
"enabled": self._enable_persistent,
"active_connections": len(self._connections),
"idle_ttl": self._idle_ttl,
"max_age": self._max_age,
"keepalive_idle": self._keepalive_idle,
"keepalive_interval": self._keepalive_interval,
"keepalive_count": self._keepalive_count,
"connections": connections,
}
# -- internals ----------------------------------------------------------
async def _open_connection(
self, host: str, port: int, timeout: float
) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Open a new TCP connection with keepalive options set."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=timeout
)
except asyncio.TimeoutError:
raise ConnectionError(f"Failed to connect to device at {host}:{port}")
except Exception as e:
raise ConnectionError(f"Failed to connect to device: {e}")
# Set TCP keepalive on the underlying socket
self._set_keepalive(writer)
return reader, writer
def _set_keepalive(self, writer: asyncio.StreamWriter):
"""Configure OS-level TCP keepalive on the connection socket."""
try:
sock = writer.transport.get_extra_info("socket")
if sock is None:
logger.warning("Could not access underlying socket for keepalive")
return
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Linux-specific keepalive tuning
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self._keepalive_idle)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, self._keepalive_interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self._keepalive_count)
logger.debug(
f"TCP keepalive set: idle={self._keepalive_idle}s, "
f"interval={self._keepalive_interval}s, count={self._keepalive_count}"
)
except OSError as e:
logger.warning(f"Failed to set TCP keepalive: {e}")
def _is_alive(self, conn: DeviceConnection) -> bool:
"""Check whether a cached connection is still usable."""
now = time.time()
# Age / idle checks
if now - conn.last_used_at > self._idle_ttl:
logger.debug(f"Connection {conn.device_key} idle too long ({now - conn.last_used_at:.0f}s > {self._idle_ttl}s)")
return False
if now - conn.created_at > self._max_age:
logger.debug(f"Connection {conn.device_key} too old ({now - conn.created_at:.0f}s > {self._max_age}s)")
return False
# Transport-level checks
transport = conn.writer.transport
if transport.is_closing():
logger.debug(f"Connection {conn.device_key} transport is closing")
return False
if conn.reader.at_eof():
logger.debug(f"Connection {conn.device_key} reader at EOF")
return False
return True
@staticmethod
def _drain_buffer(reader: asyncio.StreamReader):
"""Drain any pending bytes (e.g. '$' prompt) from an idle connection."""
buf = reader._buffer # noqa: SLF001 — internal but stable across CPython
if buf:
pending = bytes(buf)
buf.clear()
logger.debug(f"Drained {len(pending)} bytes from cached connection: {pending!r}")
@staticmethod
def _close_writer(writer: asyncio.StreamWriter):
"""Close a writer, suppressing errors."""
try:
writer.close()
except Exception:
pass
async def _close_connection(self, conn: DeviceConnection, reason: str = ""):
"""Fully close a cached connection."""
logger.debug(f"Closing connection {conn.device_key} ({reason})")
conn.writer.close()
with contextlib.suppress(Exception):
await conn.writer.wait_closed()
async def _cleanup_loop(self):
"""Periodically evict idle/expired connections."""
try:
while True:
await asyncio.sleep(30)
async with self._lock:
for key in list(self._connections):
conn = self._connections[key]
if not self._is_alive(conn):
del self._connections[key]
await self._close_connection(conn, reason="cleanup")
except asyncio.CancelledError:
pass
# Module-level pool singleton
_connection_pool = ConnectionPool(
enable_persistent=TCP_PERSISTENT_ENABLED,
idle_ttl=TCP_IDLE_TTL,
max_age=TCP_MAX_AGE,
keepalive_idle=TCP_KEEPALIVE_IDLE,
keepalive_interval=TCP_KEEPALIVE_INTERVAL,
keepalive_count=TCP_KEEPALIVE_COUNT,
)
class NL43Client:
def __init__(self, host: str, port: int, timeout: float = 5.0, ftp_username: str = None, ftp_password: str = None, ftp_port: int = 21):
@@ -117,7 +534,12 @@ class NL43Client:
self.device_key = f"{host}:{port}"
async def _enforce_rate_limit(self):
"""Ensure ≥1 second between commands to the same device."""
"""Ensure ≥1 second between commands to the same device.
NL43 protocol requires ≥1s after the device responds before sending
the next command. The timestamp is recorded after each command completes
(connection closed), so we measure from completion, not from send time.
"""
async with _rate_limit_lock:
last_time = _last_command_time.get(self.device_key, 0)
elapsed = time.time() - last_time
@@ -125,7 +547,6 @@ class NL43Client:
wait_time = 1.0 - elapsed
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {self.device_key}")
await asyncio.sleep(wait_time)
_last_command_time[self.device_key] = time.time()
async def _send_command(self, cmd: str) -> str:
"""Send ASCII command to NL43 device via TCP.
@@ -133,23 +554,72 @@ class NL43Client:
NL43 protocol returns two lines for query commands:
Line 1: Result code (R+0000 for success, error codes otherwise)
Line 2: Actual data (for query commands ending with '?')
This method acquires a per-device lock to ensure only one TCP connection
is active at a time (NL43 devices only support single connections).
"""
# Acquire per-device lock to prevent concurrent connections
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
return await self._send_command_unlocked(cmd)
async def _send_command_unlocked(self, cmd: str) -> str:
"""Internal: send command without acquiring device lock (lock must be held by caller).
Uses the connection pool to reuse cached TCP connections when possible.
If a cached connection fails, retries once with a fresh connection.
"""
await self._enforce_rate_limit()
logger.info(f"Sending command to {self.device_key}: {cmd.strip()}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
reader, writer, from_cache = await _connection_pool.acquire(
self.device_key, self.host, self.port, self.timeout
)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"Connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
except ConnectionError:
logger.error(f"Connection failed to {self.device_key}")
raise
try:
response = await self._execute_command(reader, writer, cmd)
# Success — return connection to pool for reuse
await _connection_pool.release(self.device_key, reader, writer, self.host, self.port)
_last_command_time[self.device_key] = time.time()
return response
except Exception as e:
# Discard the bad connection
await _connection_pool.discard(self.device_key)
ConnectionPool._close_writer(writer)
if from_cache:
# Retry once with a fresh connection — the cached one may have gone stale
logger.warning(f"Cached connection failed for {self.device_key}, retrying fresh: {e}")
await self._enforce_rate_limit()
try:
reader, writer, _ = await _connection_pool.acquire(
self.device_key, self.host, self.port, self.timeout
)
except ConnectionError:
logger.error(f"Retry connection also failed to {self.device_key}")
raise
try:
response = await self._execute_command(reader, writer, cmd)
await _connection_pool.release(self.device_key, reader, writer, self.host, self.port)
_last_command_time[self.device_key] = time.time()
return response
except Exception:
await _connection_pool.discard(self.device_key)
ConnectionPool._close_writer(writer)
raise
else:
raise
async def _execute_command(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, cmd: str) -> str:
"""Send a command over an existing connection and parse the NL43 response."""
writer.write(cmd.encode("ascii"))
await writer.drain()
@@ -165,7 +635,7 @@ class NL43Client:
# Check result code
if result_code == "R+0000":
# Success - for query commands, read the second line with actual data
# Success for query commands, read the second line with actual data
is_query = cmd.strip().endswith("?")
if is_query:
data_line = await asyncio.wait_for(reader.readuntil(b"\n"), timeout=self.timeout)
@@ -173,7 +643,7 @@ class NL43Client:
logger.debug(f"Data line from {self.device_key}: {response}")
return response
else:
# Setting command - return success code
# Setting command return success code
return result_code
elif result_code == "R+0001":
raise ValueError("Command error - device did not recognize command")
@@ -186,17 +656,6 @@ class NL43Client:
else:
raise ValueError(f"Unknown result code: {result_code}")
except asyncio.TimeoutError:
logger.error(f"Response timeout from {self.device_key}")
raise TimeoutError(f"Device did not respond within {self.timeout}s")
except Exception as e:
logger.error(f"Communication error with {self.device_key}: {e}")
raise
finally:
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
async def request_dod(self) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device.
@@ -429,21 +888,27 @@ class NL43Client:
The stream continues until an exception occurs or the connection is closed.
Send SUB character (0x1A) to stop the stream.
NOTE: This method holds the device lock for the entire duration of streaming,
blocking other commands to this device. This is intentional since NL43 devices
only support one TCP connection at a time.
"""
# Acquire per-device lock - held for entire streaming session
device_lock = await _get_device_lock(self.device_key)
async with device_lock:
# Evict any cached connection — streaming needs its own dedicated socket
await _connection_pool.discard(self.device_key)
await self._enforce_rate_limit()
logger.info(f"Starting DRD stream for {self.device_key}")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=self.timeout
reader, writer = await _connection_pool._open_connection(
self.host, self.port, self.timeout
)
except asyncio.TimeoutError:
logger.error(f"DRD stream connection timeout to {self.device_key}")
raise ConnectionError(f"Failed to connect to device at {self.host}:{self.port}")
except Exception as e:
logger.error(f"DRD stream connection failed to {self.device_key}: {e}")
raise ConnectionError(f"Failed to connect to device: {str(e)}")
except ConnectionError:
logger.error(f"DRD stream connection failed to {self.device_key}")
raise
try:
# Start DRD streaming
@@ -1236,11 +1701,42 @@ class NL43Client:
result["stopped"] = True
logger.info(f"[STOP-CYCLE] Measurement stopped")
# Step 2: Enable FTP
logger.info(f"[STOP-CYCLE] Step 2: Enabling FTP")
# Step 2: Reset FTP (disable then enable) to clear any stale state
logger.info(f"[STOP-CYCLE] Step 2: Resetting FTP (disable then enable)")
try:
await self.disable_ftp()
logger.info(f"[STOP-CYCLE] FTP disabled")
except Exception as e:
logger.warning(f"[STOP-CYCLE] FTP disable failed (may already be off): {e}")
await self.enable_ftp()
logger.info(f"[STOP-CYCLE] FTP enable command sent")
# Step 2b: Wait and verify FTP is ready (NL-43 needs time to start FTP server)
ftp_ready_timeout = 30 # seconds
ftp_check_interval = 2 # seconds
ftp_ready = False
elapsed = 0
logger.info(f"[STOP-CYCLE] Step 2b: Waiting up to {ftp_ready_timeout}s for FTP server to be ready")
while elapsed < ftp_ready_timeout:
await asyncio.sleep(ftp_check_interval)
elapsed += ftp_check_interval
try:
ftp_status = await self.get_ftp_status()
logger.info(f"[STOP-CYCLE] FTP status check at {elapsed}s: {ftp_status}")
if ftp_status.lower() == "on":
ftp_ready = True
logger.info(f"[STOP-CYCLE] FTP server confirmed ready after {elapsed}s")
break
except Exception as e:
logger.warning(f"[STOP-CYCLE] FTP status check failed at {elapsed}s: {e}")
if ftp_ready:
result["ftp_enabled"] = True
logger.info(f"[STOP-CYCLE] FTP enabled")
logger.info(f"[STOP-CYCLE] FTP enabled and verified")
else:
logger.warning(f"[STOP-CYCLE] FTP not confirmed ready after {ftp_ready_timeout}s, proceeding anyway")
result["ftp_enabled"] = True # Command was sent, just not verified
if not download:
logger.info(f"[STOP-CYCLE] === Cycle complete (download=False) ===")

View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Database migration: Add device_logs table.
This table stores per-device log entries for debugging and audit trail.
Run this once to add the new table.
"""
import sqlite3
import os
# Path to the SLMM database
DB_PATH = os.path.join(os.path.dirname(__file__), "data", "slmm.db")
def migrate():
print(f"Adding device_logs table to: {DB_PATH}")
if not os.path.exists(DB_PATH):
print("Database does not exist yet. Table will be created automatically on first run.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# Check if table already exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='device_logs'
""")
if cursor.fetchone():
print("✓ device_logs table already exists, no migration needed")
return
# Create the table
print("Creating device_logs table...")
cursor.execute("""
CREATE TABLE device_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
unit_id VARCHAR NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
level VARCHAR DEFAULT 'INFO',
category VARCHAR DEFAULT 'GENERAL',
message TEXT NOT NULL
)
""")
# Create indexes for efficient querying
print("Creating indexes...")
cursor.execute("CREATE INDEX ix_device_logs_unit_id ON device_logs (unit_id)")
cursor.execute("CREATE INDEX ix_device_logs_timestamp ON device_logs (timestamp)")
conn.commit()
print("✓ Created device_logs table with indexes")
# Verify
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='device_logs'
""")
if not cursor.fetchone():
raise Exception("device_logs table was not created successfully")
print("✓ Migration completed successfully")
finally:
conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python3
"""
Database migration: Add start_time_sync_attempted field to nl43_status table.
This field tracks whether FTP sync has been attempted for the current measurement,
preventing repeated sync attempts when FTP fails.
Run this once to add the new column.
"""
import sqlite3
import os
# Path to the SLMM database
DB_PATH = os.path.join(os.path.dirname(__file__), "data", "slmm.db")
def migrate():
print(f"Adding start_time_sync_attempted field to: {DB_PATH}")
if not os.path.exists(DB_PATH):
print("Database does not exist yet. Column will be created automatically.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# Check if column already exists
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [col[1] for col in cursor.fetchall()]
if 'start_time_sync_attempted' in columns:
print("✓ start_time_sync_attempted column already exists, no migration needed")
return
# Add the column
print("Adding start_time_sync_attempted column...")
cursor.execute("""
ALTER TABLE nl43_status
ADD COLUMN start_time_sync_attempted BOOLEAN DEFAULT 0
""")
conn.commit()
print("✓ Added start_time_sync_attempted column")
# Verify
cursor.execute("PRAGMA table_info(nl43_status)")
columns = [col[1] for col in cursor.fetchall()]
if 'start_time_sync_attempted' not in columns:
raise Exception("start_time_sync_attempted column was not added successfully")
print("✓ Migration completed successfully")
finally:
conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -333,6 +333,134 @@
html += `<p style="margin-top: 12px; font-size: 0.9em; color: #666;">Last run: ${new Date(data.timestamp).toLocaleString()}</p>`;
// Add database dump section if available
if (data.database_dump) {
html += `<div style="margin-top: 16px; border-top: 1px solid #d0d7de; padding-top: 12px;">`;
html += `<h4 style="margin: 0 0 12px 0;">📦 Database Dump</h4>`;
// Config section
if (data.database_dump.config) {
const cfg = data.database_dump.config;
html += `<div style="background: #f0f4f8; padding: 12px; border-radius: 4px; margin-bottom: 12px;">`;
html += `<strong>Configuration (nl43_config)</strong>`;
html += `<table style="width: 100%; margin-top: 8px; font-size: 0.9em;">`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Host</td><td>${cfg.host}:${cfg.tcp_port}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">TCP Enabled</td><td>${cfg.tcp_enabled ? '✓' : '✗'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">FTP Enabled</td><td>${cfg.ftp_enabled ? '✓' : '✗'}${cfg.ftp_enabled ? ` (port ${cfg.ftp_port}, user: ${cfg.ftp_username || 'none'})` : ''}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Background Polling</td><td>${cfg.poll_enabled ? `✓ every ${cfg.poll_interval_seconds}s` : '✗ disabled'}</td></tr>`;
html += `</table></div>`;
}
// Status cache section
if (data.database_dump.status_cache) {
const cache = data.database_dump.status_cache;
html += `<div style="background: #f0f8f4; padding: 12px; border-radius: 4px; margin-bottom: 12px;">`;
html += `<strong>Status Cache (nl43_status)</strong>`;
html += `<table style="width: 100%; margin-top: 8px; font-size: 0.9em;">`;
// Measurement state and timing
html += `<tr><td style="padding: 2px 8px; color: #666;">Measurement State</td><td><strong>${cache.measurement_state || 'unknown'}</strong></td></tr>`;
if (cache.measurement_start_time) {
const startTime = new Date(cache.measurement_start_time);
const elapsed = Math.floor((Date.now() - startTime) / 1000);
const elapsedStr = elapsed > 3600 ? `${Math.floor(elapsed/3600)}h ${Math.floor((elapsed%3600)/60)}m` : elapsed > 60 ? `${Math.floor(elapsed/60)}m ${elapsed%60}s` : `${elapsed}s`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Measurement Started</td><td>${startTime.toLocaleString()} (${elapsedStr} ago)</td></tr>`;
}
html += `<tr><td style="padding: 2px 8px; color: #666;">Counter (d0)</td><td>${cache.counter || 'N/A'}</td></tr>`;
// Sound levels
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Sound Levels (dB)</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lp (Instantaneous)</td><td>${cache.lp || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Leq (Equivalent)</td><td>${cache.leq || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lmax / Lmin</td><td>${cache.lmax || 'N/A'} / ${cache.lmin || 'N/A'}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Lpeak</td><td>${cache.lpeak || 'N/A'}</td></tr>`;
// Device status
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Device Status</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Battery</td><td>${cache.battery_level || 'N/A'}${cache.power_source ? ` (${cache.power_source})` : ''}</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">SD Card</td><td>${cache.sd_remaining_mb ? `${cache.sd_remaining_mb} MB` : 'N/A'}${cache.sd_free_ratio ? ` (${cache.sd_free_ratio} free)` : ''}</td></tr>`;
// Polling status
html += `<tr><td colspan="2" style="padding: 8px 8px 2px 8px; font-weight: 600; border-top: 1px solid #d0d7de;">Polling Status</td></tr>`;
html += `<tr><td style="padding: 2px 8px; color: #666;">Reachable</td><td>${cache.is_reachable ? '🟢 Yes' : '🔴 No'}</td></tr>`;
if (cache.last_seen) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Seen</td><td>${new Date(cache.last_seen).toLocaleString()}</td></tr>`;
}
if (cache.last_success) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Success</td><td>${new Date(cache.last_success).toLocaleString()}</td></tr>`;
}
if (cache.last_poll_attempt) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Poll Attempt</td><td>${new Date(cache.last_poll_attempt).toLocaleString()}</td></tr>`;
}
html += `<tr><td style="padding: 2px 8px; color: #666;">Consecutive Failures</td><td>${cache.consecutive_failures || 0}</td></tr>`;
if (cache.last_error) {
html += `<tr><td style="padding: 2px 8px; color: #666;">Last Error</td><td style="color: #d00; font-size: 0.85em;">${cache.last_error}</td></tr>`;
}
html += `</table></div>`;
// Raw payload (collapsible)
if (cache.raw_payload) {
html += `<details style="margin-top: 8px;"><summary style="cursor: pointer; color: #666; font-size: 0.9em;">📄 Raw Payload</summary>`;
html += `<pre style="background: #f6f8fa; padding: 8px; border-radius: 4px; font-size: 0.8em; overflow-x: auto; margin-top: 8px;">${cache.raw_payload}</pre></details>`;
}
} else {
html += `<p style="color: #888; font-style: italic;">No cached status available for this unit.</p>`;
}
html += `</div>`;
}
// Fetch and display device logs
try {
const logsRes = await fetch(`/api/nl43/${unitId}/logs?limit=50`);
if (logsRes.ok) {
const logsData = await logsRes.json();
if (logsData.logs && logsData.logs.length > 0) {
html += `<div style="margin-top: 16px; border-top: 1px solid #d0d7de; padding-top: 12px;">`;
html += `<h4 style="margin: 0 0 12px 0;">📋 Device Logs (${logsData.stats.total} total)</h4>`;
// Stats summary
if (logsData.stats.by_level) {
html += `<div style="margin-bottom: 8px; font-size: 0.85em; color: #666;">`;
const levels = logsData.stats.by_level;
const parts = [];
if (levels.ERROR) parts.push(`<span style="color: #d00;">${levels.ERROR} errors</span>`);
if (levels.WARNING) parts.push(`<span style="color: #fa0;">${levels.WARNING} warnings</span>`);
if (levels.INFO) parts.push(`${levels.INFO} info`);
html += parts.join(' · ');
html += `</div>`;
}
// Log entries (collapsible)
html += `<details open><summary style="cursor: pointer; font-size: 0.9em; margin-bottom: 8px;">Recent entries (${logsData.logs.length})</summary>`;
html += `<div style="max-height: 300px; overflow-y: auto; background: #f6f8fa; border: 1px solid #d0d7de; border-radius: 4px; padding: 8px; font-size: 0.8em; font-family: monospace;">`;
logsData.logs.forEach(entry => {
const levelColor = {
'ERROR': '#d00',
'WARNING': '#b86e00',
'INFO': '#0969da',
'DEBUG': '#888'
}[entry.level] || '#666';
const time = new Date(entry.timestamp).toLocaleString();
html += `<div style="margin-bottom: 4px; border-bottom: 1px solid #eee; padding-bottom: 4px;">`;
html += `<span style="color: #888;">${time}</span> `;
html += `<span style="color: ${levelColor}; font-weight: 600;">[${entry.level}]</span> `;
html += `<span style="color: #666;">[${entry.category}]</span> `;
html += `${entry.message}`;
html += `</div>`;
});
html += `</div></details>`;
html += `</div>`;
}
}
} catch (logErr) {
console.log('Could not fetch device logs:', logErr);
}
resultsEl.innerHTML = html;
log(`Diagnostics complete: ${data.overall_status}`);

View File

@@ -3,7 +3,7 @@
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SLMM Roster - Sound Level Meter Configuration</title>
<title>SLMM - Device Roster &amp; Connections</title>
<style>
* { box-sizing: border-box; }
body {
@@ -227,19 +227,119 @@
}
.toast-success { background: #2da44e; }
.toast-error { background: #cf222e; }
/* Tabs */
.tabs {
display: flex;
gap: 0;
margin-bottom: 0;
border-bottom: 2px solid #d0d7de;
}
.tab-btn {
padding: 10px 20px;
border: none;
background: none;
cursor: pointer;
font-size: 14px;
font-weight: 600;
color: #57606a;
border-bottom: 2px solid transparent;
margin-bottom: -2px;
transition: color 0.2s, border-color 0.2s;
}
.tab-btn:hover { color: #24292f; }
.tab-btn.active {
color: #24292f;
border-bottom-color: #fd8c73;
}
.tab-panel { display: none; }
.tab-panel.active { display: block; }
/* Connection pool panel */
.pool-config {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
gap: 12px;
margin-bottom: 20px;
}
.pool-config-card {
background: #f6f8fa;
border: 1px solid #d0d7de;
border-radius: 6px;
padding: 12px;
}
.pool-config-card .label {
font-size: 11px;
color: #57606a;
text-transform: uppercase;
font-weight: 600;
margin-bottom: 4px;
}
.pool-config-card .value {
font-size: 18px;
font-weight: 600;
color: #24292f;
}
.conn-card {
background: white;
border: 1px solid #d0d7de;
border-radius: 6px;
padding: 16px;
margin-bottom: 12px;
}
.conn-card-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 12px;
}
.conn-card-header strong { font-size: 15px; }
.conn-card-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(140px, 1fr));
gap: 8px;
}
.conn-stat .label {
font-size: 11px;
color: #57606a;
text-transform: uppercase;
font-weight: 600;
}
.conn-stat .value {
font-size: 14px;
font-weight: 600;
color: #24292f;
}
.conn-empty {
text-align: center;
padding: 32px;
color: #57606a;
}
.pool-actions {
display: flex;
gap: 8px;
margin-bottom: 16px;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📊 Sound Level Meter Roster</h1>
<h1>SLMM - Roster &amp; Connections</h1>
<div class="nav">
<a href="/" class="btn"> Back to Control Panel</a>
<a href="/" class="btn">&larr; Back to Control Panel</a>
<button class="btn btn-primary" onclick="openAddModal()">+ Add Device</button>
</div>
</div>
<div class="table-container">
<div class="tabs">
<button class="tab-btn active" onclick="switchTab('roster')">Device Roster</button>
<button class="tab-btn" onclick="switchTab('connections')">Connections</button>
</div>
<!-- Roster Tab -->
<div id="tab-roster" class="tab-panel active">
<div class="table-container" style="border-top-left-radius: 0; border-top-right-radius: 0;">
<table id="rosterTable">
<thead>
<tr>
@@ -265,6 +365,30 @@
</div>
</div>
<!-- Connections Tab -->
<div id="tab-connections" class="tab-panel">
<div class="table-container" style="padding: 20px; border-top-left-radius: 0; border-top-right-radius: 0;">
<div class="pool-actions">
<button class="btn" onclick="loadConnections()">Refresh</button>
<button class="btn btn-danger" onclick="flushConnections()">Flush All Connections</button>
</div>
<h3 style="margin: 0 0 12px 0; font-size: 16px;">Pool Configuration</h3>
<div id="poolConfig" class="pool-config">
<div class="pool-config-card">
<div class="label">Status</div>
<div class="value" id="poolEnabled">--</div>
</div>
</div>
<h3 style="margin: 20px 0 12px 0; font-size: 16px;">Active Connections</h3>
<div id="connectionsList">
<div class="conn-empty">Loading...</div>
</div>
</div>
</div>
</div>
<!-- Add/Edit Modal -->
<div id="deviceModal" class="modal">
<div class="modal-content">
@@ -619,6 +743,159 @@
closeModal();
}
});
// ========== Tab Switching ==========
function switchTab(tabName) {
document.querySelectorAll('.tab-btn').forEach(btn => btn.classList.remove('active'));
document.querySelectorAll('.tab-panel').forEach(panel => panel.classList.remove('active'));
document.querySelector(`.tab-btn[onclick="switchTab('${tabName}')"]`).classList.add('active');
document.getElementById(`tab-${tabName}`).classList.add('active');
if (tabName === 'connections') {
loadConnections();
}
}
// ========== Connection Pool ==========
let connectionsRefreshTimer = null;
async function loadConnections() {
try {
const res = await fetch('/api/nl43/_connections/status');
const data = await res.json();
if (!res.ok) {
showToast('Failed to load connection pool status', 'error');
return;
}
const pool = data.pool;
renderPoolConfig(pool);
renderConnections(pool.connections);
// Auto-refresh while tab is active
clearTimeout(connectionsRefreshTimer);
if (document.getElementById('tab-connections').classList.contains('active')) {
connectionsRefreshTimer = setTimeout(loadConnections, 5000);
}
} catch (err) {
showToast('Error loading connections: ' + err.message, 'error');
console.error('Load connections error:', err);
}
}
function renderPoolConfig(pool) {
document.getElementById('poolConfig').innerHTML = `
<div class="pool-config-card">
<div class="label">Persistent</div>
<div class="value" style="color: ${pool.enabled ? '#1a7f37' : '#cf222e'}">${pool.enabled ? 'Enabled' : 'Disabled'}</div>
</div>
<div class="pool-config-card">
<div class="label">Active</div>
<div class="value">${pool.active_connections}</div>
</div>
<div class="pool-config-card">
<div class="label">Idle TTL</div>
<div class="value">${pool.idle_ttl}s</div>
</div>
<div class="pool-config-card">
<div class="label">Max Age</div>
<div class="value">${pool.max_age}s</div>
</div>
<div class="pool-config-card">
<div class="label">KA Idle</div>
<div class="value">${pool.keepalive_idle}s</div>
</div>
<div class="pool-config-card">
<div class="label">KA Interval</div>
<div class="value">${pool.keepalive_interval}s</div>
</div>
<div class="pool-config-card">
<div class="label">KA Probes</div>
<div class="value">${pool.keepalive_count}</div>
</div>
`;
}
function renderConnections(connections) {
const container = document.getElementById('connectionsList');
const keys = Object.keys(connections);
if (keys.length === 0) {
container.innerHTML = `
<div class="conn-empty">
<div style="font-size: 32px; margin-bottom: 8px;">~</div>
<div><strong>No active connections</strong></div>
<div style="margin-top: 4px; font-size: 13px;">
Connections appear here when devices are actively being polled and the connection is cached between commands.
</div>
</div>
`;
return;
}
container.innerHTML = keys.map(key => {
const conn = connections[key];
const aliveColor = conn.alive ? '#1a7f37' : '#cf222e';
const aliveText = conn.alive ? 'Alive' : 'Stale';
return `
<div class="conn-card">
<div class="conn-card-header">
<strong>${escapeHtml(key)}</strong>
<span class="status-badge ${conn.alive ? 'status-ok' : 'status-error'}">${aliveText}</span>
</div>
<div class="conn-card-grid">
<div class="conn-stat">
<div class="label">Host</div>
<div class="value">${escapeHtml(conn.host)}</div>
</div>
<div class="conn-stat">
<div class="label">Port</div>
<div class="value">${conn.port}</div>
</div>
<div class="conn-stat">
<div class="label">Age</div>
<div class="value">${formatSeconds(conn.age_seconds)}</div>
</div>
<div class="conn-stat">
<div class="label">Idle</div>
<div class="value">${formatSeconds(conn.idle_seconds)}</div>
</div>
</div>
</div>
`;
}).join('');
}
function formatSeconds(s) {
if (s < 60) return Math.round(s) + 's';
if (s < 3600) return Math.floor(s / 60) + 'm ' + Math.round(s % 60) + 's';
return Math.floor(s / 3600) + 'h ' + Math.floor((s % 3600) / 60) + 'm';
}
async function flushConnections() {
if (!confirm('Close all cached TCP connections?\n\nDevices will reconnect on the next poll cycle.')) {
return;
}
try {
const res = await fetch('/api/nl43/_connections/flush', { method: 'POST' });
const data = await res.json();
if (!res.ok) {
showToast(data.detail || 'Failed to flush connections', 'error');
return;
}
showToast('All connections flushed', 'success');
await loadConnections();
} catch (err) {
showToast('Error flushing connections: ' + err.message, 'error');
}
}
</script>
</body>
</html>

View File

@@ -1,128 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify that sleep mode is automatically disabled when:
1. Device configuration is created/updated with TCP enabled
2. Measurements are started
This script tests the API endpoints, not the actual device communication.
"""
import requests
import json
BASE_URL = "http://localhost:8100/api/nl43"
UNIT_ID = "test-nl43-001"
def test_config_update():
"""Test that config update works (actual sleep mode disable requires real device)"""
print("\n=== Testing Config Update ===")
# Create/update a device config
config_data = {
"host": "192.168.1.100",
"tcp_port": 2255,
"tcp_enabled": True,
"ftp_enabled": False,
"ftp_username": "admin",
"ftp_password": "password"
}
print(f"Updating config for {UNIT_ID}...")
response = requests.put(f"{BASE_URL}/{UNIT_ID}/config", json=config_data)
if response.status_code == 200:
print("✓ Config updated successfully")
print(f"Response: {json.dumps(response.json(), indent=2)}")
print("\nNote: Sleep mode disable was attempted (will succeed if device is reachable)")
return True
else:
print(f"✗ Config update failed: {response.status_code}")
print(f"Error: {response.text}")
return False
def test_get_config():
"""Test retrieving the config"""
print("\n=== Testing Get Config ===")
response = requests.get(f"{BASE_URL}/{UNIT_ID}/config")
if response.status_code == 200:
print("✓ Config retrieved successfully")
print(f"Response: {json.dumps(response.json(), indent=2)}")
return True
elif response.status_code == 404:
print("✗ Config not found (create one first)")
return False
else:
print(f"✗ Request failed: {response.status_code}")
print(f"Error: {response.text}")
return False
def test_start_measurement():
"""Test that start measurement attempts to disable sleep mode"""
print("\n=== Testing Start Measurement ===")
print(f"Attempting to start measurement on {UNIT_ID}...")
response = requests.post(f"{BASE_URL}/{UNIT_ID}/start")
if response.status_code == 200:
print("✓ Start command accepted")
print(f"Response: {json.dumps(response.json(), indent=2)}")
print("\nNote: Sleep mode was disabled before starting measurement")
return True
elif response.status_code == 404:
print("✗ Device config not found (create config first)")
return False
elif response.status_code == 502:
print("✗ Device not reachable (expected if no physical device)")
print(f"Response: {response.text}")
print("\nNote: This is expected behavior when testing without a physical device")
return True # This is actually success - the endpoint tried to communicate
else:
print(f"✗ Request failed: {response.status_code}")
print(f"Error: {response.text}")
return False
def main():
print("=" * 60)
print("Sleep Mode Auto-Disable Test")
print("=" * 60)
print("\nThis test verifies that sleep mode is automatically disabled")
print("when device configs are updated or measurements are started.")
print("\nNote: Without a physical device, some operations will fail at")
print("the device communication level, but the API logic will execute.")
# Run tests
results = []
# Test 1: Update config (should attempt to disable sleep mode)
results.append(("Config Update", test_config_update()))
# Test 2: Get config
results.append(("Get Config", test_get_config()))
# Test 3: Start measurement (should attempt to disable sleep mode)
results.append(("Start Measurement", test_start_measurement()))
# Summary
print("\n" + "=" * 60)
print("Test Summary")
print("=" * 60)
for test_name, result in results:
status = "✓ PASS" if result else "✗ FAIL"
print(f"{status}: {test_name}")
print("\n" + "=" * 60)
print("Implementation Details:")
print("=" * 60)
print("1. Config endpoint is now async and calls ensure_sleep_mode_disabled()")
print(" when TCP is enabled")
print("2. Start measurement endpoint calls ensure_sleep_mode_disabled()")
print(" before starting the measurement")
print("3. Sleep mode check is non-blocking - config/start will succeed")
print(" even if the device is unreachable")
print("=" * 60)
if __name__ == "__main__":
main()