BIG update: Update to 0.5.1. Added:

-Project management
-Modem Managerment
-Modem/unit pairing

and more
This commit is contained in:
serversdwn
2026-01-28 03:27:50 +00:00
parent 44d7841852
commit 6492fdff82
24 changed files with 2459 additions and 90 deletions

View File

@@ -0,0 +1,286 @@
"""
Modem Dashboard Router
Provides API endpoints for the Field Modems management page.
"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from datetime import datetime
import subprocess
import time
import logging
from backend.database import get_db
from backend.models import RosterUnit
from backend.templates_config import templates
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/modem-dashboard", tags=["modem-dashboard"])
@router.get("/stats", response_class=HTMLResponse)
async def get_modem_stats(request: Request, db: Session = Depends(get_db)):
"""
Get summary statistics for modem dashboard.
Returns HTML partial with stat cards.
"""
# Query all modems
all_modems = db.query(RosterUnit).filter_by(device_type="modem").all()
# Get IDs of modems that have devices paired to them
paired_modem_ids = set()
devices_with_modems = db.query(RosterUnit).filter(
RosterUnit.deployed_with_modem_id.isnot(None),
RosterUnit.retired == False
).all()
for device in devices_with_modems:
if device.deployed_with_modem_id:
paired_modem_ids.add(device.deployed_with_modem_id)
# Count categories
total_count = len(all_modems)
retired_count = sum(1 for m in all_modems if m.retired)
# In use = deployed AND paired with a device
in_use_count = sum(1 for m in all_modems
if m.deployed and not m.retired and m.id in paired_modem_ids)
# Spare = deployed but NOT paired (available for assignment)
spare_count = sum(1 for m in all_modems
if m.deployed and not m.retired and m.id not in paired_modem_ids)
# Benched = not deployed and not retired
benched_count = sum(1 for m in all_modems if not m.deployed and not m.retired)
return templates.TemplateResponse("partials/modem_stats.html", {
"request": request,
"total_count": total_count,
"in_use_count": in_use_count,
"spare_count": spare_count,
"benched_count": benched_count,
"retired_count": retired_count
})
@router.get("/units", response_class=HTMLResponse)
async def get_modem_units(
request: Request,
db: Session = Depends(get_db),
search: str = Query(None),
filter_status: str = Query(None), # "in_use", "spare", "benched", "retired"
):
"""
Get list of modem units for the dashboard.
Returns HTML partial with modem cards.
"""
query = db.query(RosterUnit).filter_by(device_type="modem")
# Filter by search term if provided
if search:
search_term = f"%{search}%"
query = query.filter(
(RosterUnit.id.ilike(search_term)) |
(RosterUnit.ip_address.ilike(search_term)) |
(RosterUnit.hardware_model.ilike(search_term)) |
(RosterUnit.phone_number.ilike(search_term)) |
(RosterUnit.location.ilike(search_term))
)
modems = query.order_by(
RosterUnit.retired.asc(),
RosterUnit.deployed.desc(),
RosterUnit.id.asc()
).all()
# Get paired device info for each modem
paired_devices = {}
devices_with_modems = db.query(RosterUnit).filter(
RosterUnit.deployed_with_modem_id.isnot(None),
RosterUnit.retired == False
).all()
for device in devices_with_modems:
if device.deployed_with_modem_id:
paired_devices[device.deployed_with_modem_id] = {
"id": device.id,
"device_type": device.device_type,
"deployed": device.deployed
}
# Annotate modems with paired device info
modem_list = []
for modem in modems:
paired = paired_devices.get(modem.id)
# Determine status category
if modem.retired:
status = "retired"
elif not modem.deployed:
status = "benched"
elif paired:
status = "in_use"
else:
status = "spare"
# Apply filter if specified
if filter_status and status != filter_status:
continue
modem_list.append({
"id": modem.id,
"ip_address": modem.ip_address,
"phone_number": modem.phone_number,
"hardware_model": modem.hardware_model,
"deployed": modem.deployed,
"retired": modem.retired,
"location": modem.location,
"project_id": modem.project_id,
"paired_device": paired,
"status": status
})
return templates.TemplateResponse("partials/modem_list.html", {
"request": request,
"modems": modem_list
})
@router.get("/{modem_id}/paired-device")
async def get_paired_device(modem_id: str, db: Session = Depends(get_db)):
"""
Get the device (SLM/seismograph) that is paired with this modem.
Returns JSON with device info or null if not paired.
"""
# Check modem exists
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
return {"status": "error", "detail": f"Modem {modem_id} not found"}
# Find device paired with this modem
device = db.query(RosterUnit).filter(
RosterUnit.deployed_with_modem_id == modem_id,
RosterUnit.retired == False
).first()
if device:
return {
"paired": True,
"device": {
"id": device.id,
"device_type": device.device_type,
"deployed": device.deployed,
"project_id": device.project_id,
"location": device.location or device.address
}
}
return {"paired": False, "device": None}
@router.get("/{modem_id}/paired-device-html", response_class=HTMLResponse)
async def get_paired_device_html(modem_id: str, request: Request, db: Session = Depends(get_db)):
"""
Get HTML partial showing the device paired with this modem.
Used by unit_detail.html for modems.
"""
# Check modem exists
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
return HTMLResponse('<p class="text-red-500">Modem not found</p>')
# Find device paired with this modem
device = db.query(RosterUnit).filter(
RosterUnit.deployed_with_modem_id == modem_id,
RosterUnit.retired == False
).first()
return templates.TemplateResponse("partials/modem_paired_device.html", {
"request": request,
"modem_id": modem_id,
"device": device
})
@router.get("/{modem_id}/ping")
async def ping_modem(modem_id: str, db: Session = Depends(get_db)):
"""
Test modem connectivity with a simple ping.
Returns response time and connection status.
"""
# Get modem from database
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
return {"status": "error", "detail": f"Modem {modem_id} not found"}
if not modem.ip_address:
return {"status": "error", "detail": f"Modem {modem_id} has no IP address configured"}
try:
# Ping the modem (1 packet, 2 second timeout)
start_time = time.time()
result = subprocess.run(
["ping", "-c", "1", "-W", "2", modem.ip_address],
capture_output=True,
text=True,
timeout=3
)
response_time = int((time.time() - start_time) * 1000) # Convert to milliseconds
if result.returncode == 0:
return {
"status": "success",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"response_time_ms": response_time,
"message": "Modem is responding"
}
else:
return {
"status": "error",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"detail": "Modem not responding to ping"
}
except subprocess.TimeoutExpired:
return {
"status": "error",
"modem_id": modem_id,
"ip_address": modem.ip_address,
"detail": "Ping timeout"
}
except Exception as e:
logger.error(f"Failed to ping modem {modem_id}: {e}")
return {
"status": "error",
"modem_id": modem_id,
"detail": str(e)
}
@router.get("/{modem_id}/diagnostics")
async def get_modem_diagnostics(modem_id: str, db: Session = Depends(get_db)):
"""
Get modem diagnostics (signal strength, data usage, uptime).
Currently returns placeholders. When ModemManager is available,
this endpoint will query it for real diagnostics.
"""
modem = db.query(RosterUnit).filter_by(id=modem_id, device_type="modem").first()
if not modem:
return {"status": "error", "detail": f"Modem {modem_id} not found"}
# TODO: Query ModemManager backend when available
return {
"status": "unavailable",
"message": "ModemManager integration not yet available",
"modem_id": modem_id,
"signal_strength_dbm": None,
"data_usage_mb": None,
"uptime_seconds": None,
"carrier": None,
"connection_type": None # LTE, 5G, etc.
}

View File

@@ -11,7 +11,7 @@ Provides API endpoints for the Projects system:
from fastapi import APIRouter, Request, Depends, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from sqlalchemy import func, and_, or_
from datetime import datetime, timedelta
from typing import Optional
from collections import OrderedDict
@@ -147,6 +147,107 @@ async def get_projects_stats(request: Request, db: Session = Depends(get_db)):
})
# ============================================================================
# Project Search (Smart Autocomplete)
# ============================================================================
def _build_project_display(project: Project) -> str:
"""Build display string from project fields: 'xxxx-YY - Client - Name'"""
parts = []
if project.project_number:
parts.append(project.project_number)
if project.client_name:
parts.append(project.client_name)
if project.name:
parts.append(project.name)
return " - ".join(parts) if parts else project.id
@router.get("/search", response_class=HTMLResponse)
async def search_projects(
request: Request,
q: str = Query("", description="Search term"),
db: Session = Depends(get_db),
limit: int = Query(10, le=50),
):
"""
Fuzzy search across project fields for autocomplete.
Searches: project_number, client_name, name (project/site name)
Returns HTML partial for HTMX dropdown.
"""
if not q.strip():
# Return recent active projects when no search term
projects = db.query(Project).filter(
Project.status != "archived"
).order_by(Project.updated_at.desc()).limit(limit).all()
else:
search_term = f"%{q}%"
projects = db.query(Project).filter(
and_(
Project.status != "archived",
or_(
Project.project_number.ilike(search_term),
Project.client_name.ilike(search_term),
Project.name.ilike(search_term),
)
)
).order_by(Project.updated_at.desc()).limit(limit).all()
# Build display data for each project
projects_data = [{
"id": p.id,
"project_number": p.project_number,
"client_name": p.client_name,
"name": p.name,
"display": _build_project_display(p),
"status": p.status,
} for p in projects]
return templates.TemplateResponse("partials/project_search_results.html", {
"request": request,
"projects": projects_data,
"query": q,
"show_create": len(projects) == 0 and q.strip(),
})
@router.get("/search-json")
async def search_projects_json(
q: str = Query("", description="Search term"),
db: Session = Depends(get_db),
limit: int = Query(10, le=50),
):
"""
Fuzzy search across project fields - JSON response.
For programmatic/API consumption.
"""
if not q.strip():
projects = db.query(Project).filter(
Project.status != "archived"
).order_by(Project.updated_at.desc()).limit(limit).all()
else:
search_term = f"%{q}%"
projects = db.query(Project).filter(
and_(
Project.status != "archived",
or_(
Project.project_number.ilike(search_term),
Project.client_name.ilike(search_term),
Project.name.ilike(search_term),
)
)
).order_by(Project.updated_at.desc()).limit(limit).all()
return [{
"id": p.id,
"project_number": p.project_number,
"client_name": p.client_name,
"name": p.name,
"display": _build_project_display(p),
"status": p.status,
} for p in projects]
# ============================================================================
# Project CRUD
# ============================================================================
@@ -161,6 +262,7 @@ async def create_project(request: Request, db: Session = Depends(get_db)):
project = Project(
id=str(uuid.uuid4()),
project_number=form_data.get("project_number"), # TMI ID: xxxx-YY format
name=form_data.get("name"),
description=form_data.get("description"),
project_type_id=form_data.get("project_type_id"),
@@ -197,6 +299,7 @@ async def get_project(project_id: str, db: Session = Depends(get_db)):
return {
"id": project.id,
"project_number": project.project_number,
"name": project.name,
"description": project.description,
"project_type_id": project.project_type_id,

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, Form, UploadFile, File, Request
from fastapi import APIRouter, Depends, HTTPException, Form, UploadFile, File, Request, Query
from fastapi.exceptions import RequestValidationError
from sqlalchemy.orm import Session
from datetime import datetime, date
@@ -150,6 +150,8 @@ async def add_roster_unit(
ip_address: str = Form(None),
phone_number: str = Form(None),
hardware_model: str = Form(None),
deployment_type: str = Form(None), # "seismograph" | "slm" - what device type modem is deployed with
deployed_with_unit_id: str = Form(None), # ID of seismograph/SLM this modem is deployed with
# Sound Level Meter-specific fields
slm_host: str = Form(None),
slm_tcp_port: str = Form(None),
@@ -209,6 +211,7 @@ async def add_roster_unit(
ip_address=ip_address if ip_address else None,
phone_number=phone_number if phone_number else None,
hardware_model=hardware_model if hardware_model else None,
deployment_type=deployment_type if deployment_type else None,
# Sound Level Meter-specific fields
slm_host=slm_host if slm_host else None,
slm_tcp_port=slm_tcp_port_int,
@@ -219,6 +222,23 @@ async def add_roster_unit(
slm_time_weighting=slm_time_weighting if slm_time_weighting else None,
slm_measurement_range=slm_measurement_range if slm_measurement_range else None,
)
# Auto-fill location data from modem if pairing and fields are empty
if deployed_with_modem_id:
modem = db.query(RosterUnit).filter(
RosterUnit.id == deployed_with_modem_id,
RosterUnit.device_type == "modem"
).first()
if modem:
if not unit.location and modem.location:
unit.location = modem.location
if not unit.address and modem.address:
unit.address = modem.address
if not unit.coordinates and modem.coordinates:
unit.coordinates = modem.coordinates
if not unit.project_id and modem.project_id:
unit.project_id = modem.project_id
db.add(unit)
db.commit()
@@ -259,6 +279,145 @@ def get_modems_list(db: Session = Depends(get_db)):
]
@router.get("/search/modems")
def search_modems(
request: Request,
q: str = Query("", description="Search term"),
deployed_only: bool = Query(False, description="Only show deployed modems"),
exclude_retired: bool = Query(True, description="Exclude retired modems"),
limit: int = Query(10, le=50),
db: Session = Depends(get_db)
):
"""
Search modems by ID, IP address, or note. Returns HTML partial for HTMX dropdown.
Used by modem picker component to find modems to link with seismographs/SLMs.
"""
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
query = db.query(RosterUnit).filter(RosterUnit.device_type == "modem")
if deployed_only:
query = query.filter(RosterUnit.deployed == True)
if exclude_retired:
query = query.filter(RosterUnit.retired == False)
# Search by ID, IP address, or note
if q and q.strip():
search_term = f"%{q.strip()}%"
query = query.filter(
(RosterUnit.id.ilike(search_term)) |
(RosterUnit.ip_address.ilike(search_term)) |
(RosterUnit.note.ilike(search_term))
)
modems = query.order_by(RosterUnit.id).limit(limit).all()
# Build results
results = []
for modem in modems:
# Build display text: ID - IP - Note (if available)
display_parts = [modem.id]
if modem.ip_address:
display_parts.append(modem.ip_address)
if modem.note:
display_parts.append(modem.note)
display = " - ".join(display_parts)
results.append({
"id": modem.id,
"ip_address": modem.ip_address or "",
"phone_number": modem.phone_number or "",
"note": modem.note or "",
"deployed": modem.deployed,
"display": display
})
# Determine if we should show "no results" message
show_empty = len(results) == 0 and q and q.strip()
return templates.TemplateResponse(
"partials/modem_search_results.html",
{
"request": request,
"modems": results,
"query": q,
"show_empty": show_empty
}
)
@router.get("/search/units")
def search_units(
request: Request,
q: str = Query("", description="Search term"),
device_type: str = Query(None, description="Filter by device type: seismograph, modem, slm"),
deployed_only: bool = Query(False, description="Only show deployed units"),
exclude_retired: bool = Query(True, description="Exclude retired units"),
limit: int = Query(10, le=50),
db: Session = Depends(get_db)
):
"""
Search roster units by ID or note. Returns HTML partial for HTMX dropdown.
Used by unit picker component to find seismographs/SLMs to link with modems.
"""
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
query = db.query(RosterUnit)
# Apply filters
if device_type:
query = query.filter(RosterUnit.device_type == device_type)
if deployed_only:
query = query.filter(RosterUnit.deployed == True)
if exclude_retired:
query = query.filter(RosterUnit.retired == False)
# Search by ID or note
if q and q.strip():
search_term = f"%{q.strip()}%"
query = query.filter(
(RosterUnit.id.ilike(search_term)) |
(RosterUnit.note.ilike(search_term))
)
units = query.order_by(RosterUnit.id).limit(limit).all()
# Build results
results = []
for unit in units:
results.append({
"id": unit.id,
"device_type": unit.device_type or "seismograph",
"note": unit.note or "",
"deployed": unit.deployed,
"display": f"{unit.id}" + (f" - {unit.note}" if unit.note else "")
})
# Determine if we should show "no results" message
show_empty = len(results) == 0 and q and q.strip()
return templates.TemplateResponse(
"partials/unit_search_results.html",
{
"request": request,
"units": results,
"query": q,
"show_empty": show_empty
}
)
@router.get("/{unit_id}")
def get_roster_unit(unit_id: str, db: Session = Depends(get_db)):
"""Get a single roster unit by ID"""
@@ -283,6 +442,8 @@ def get_roster_unit(unit_id: str, db: Session = Depends(get_db)):
"ip_address": unit.ip_address or "",
"phone_number": unit.phone_number or "",
"hardware_model": unit.hardware_model or "",
"deployment_type": unit.deployment_type or "",
"deployed_with_unit_id": unit.deployed_with_unit_id or "",
"slm_host": unit.slm_host or "",
"slm_tcp_port": unit.slm_tcp_port or "",
"slm_ftp_port": unit.slm_ftp_port or "",
@@ -314,6 +475,8 @@ def edit_roster_unit(
ip_address: str = Form(None),
phone_number: str = Form(None),
hardware_model: str = Form(None),
deployment_type: str = Form(None),
deployed_with_unit_id: str = Form(None),
# Sound Level Meter-specific fields
slm_host: str = Form(None),
slm_tcp_port: str = Form(None),
@@ -323,6 +486,14 @@ def edit_roster_unit(
slm_frequency_weighting: str = Form(None),
slm_time_weighting: str = Form(None),
slm_measurement_range: str = Form(None),
# Cascade options - sync fields to paired device
cascade_to_unit_id: str = Form(None),
cascade_deployed: str = Form(None),
cascade_retired: str = Form(None),
cascade_project: str = Form(None),
cascade_location: str = Form(None),
cascade_coordinates: str = Form(None),
cascade_note: str = Form(None),
db: Session = Depends(get_db)
):
unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
@@ -374,10 +545,29 @@ def edit_roster_unit(
unit.next_calibration_due = next_cal_date
unit.deployed_with_modem_id = deployed_with_modem_id if deployed_with_modem_id else None
# Auto-fill location data from modem if pairing and fields are empty
if deployed_with_modem_id:
modem = db.query(RosterUnit).filter(
RosterUnit.id == deployed_with_modem_id,
RosterUnit.device_type == "modem"
).first()
if modem:
# Only fill if the device field is empty
if not unit.location and modem.location:
unit.location = modem.location
if not unit.address and modem.address:
unit.address = modem.address
if not unit.coordinates and modem.coordinates:
unit.coordinates = modem.coordinates
if not unit.project_id and modem.project_id:
unit.project_id = modem.project_id
# Modem-specific fields
unit.ip_address = ip_address if ip_address else None
unit.phone_number = phone_number if phone_number else None
unit.hardware_model = hardware_model if hardware_model else None
unit.deployment_type = deployment_type if deployment_type else None
unit.deployed_with_unit_id = deployed_with_unit_id if deployed_with_unit_id else None
# Sound Level Meter-specific fields
unit.slm_host = slm_host if slm_host else None
@@ -403,8 +593,79 @@ def edit_roster_unit(
old_status_text = "retired" if old_retired else "active"
record_history(db, unit_id, "retired_change", "retired", old_status_text, status_text, "manual")
# Handle cascade to paired device
cascaded_unit_id = None
if cascade_to_unit_id and cascade_to_unit_id.strip():
paired_unit = db.query(RosterUnit).filter(RosterUnit.id == cascade_to_unit_id).first()
if paired_unit:
cascaded_unit_id = paired_unit.id
# Cascade deployed status
if cascade_deployed in ['true', 'True', '1', 'yes']:
old_paired_deployed = paired_unit.deployed
paired_unit.deployed = deployed_bool
paired_unit.last_updated = datetime.utcnow()
if old_paired_deployed != deployed_bool:
status_text = "deployed" if deployed_bool else "benched"
old_status_text = "deployed" if old_paired_deployed else "benched"
record_history(db, paired_unit.id, "deployed_change", "deployed",
old_status_text, status_text, f"cascade from {unit_id}")
# Cascade retired status
if cascade_retired in ['true', 'True', '1', 'yes']:
old_paired_retired = paired_unit.retired
paired_unit.retired = retired_bool
paired_unit.last_updated = datetime.utcnow()
if old_paired_retired != retired_bool:
status_text = "retired" if retired_bool else "active"
old_status_text = "retired" if old_paired_retired else "active"
record_history(db, paired_unit.id, "retired_change", "retired",
old_status_text, status_text, f"cascade from {unit_id}")
# Cascade project
if cascade_project in ['true', 'True', '1', 'yes']:
old_paired_project = paired_unit.project_id
paired_unit.project_id = project_id
paired_unit.last_updated = datetime.utcnow()
if old_paired_project != project_id:
record_history(db, paired_unit.id, "project_change", "project_id",
old_paired_project or "", project_id or "", f"cascade from {unit_id}")
# Cascade address/location
if cascade_location in ['true', 'True', '1', 'yes']:
old_paired_address = paired_unit.address
old_paired_location = paired_unit.location
paired_unit.address = address
paired_unit.location = location
paired_unit.last_updated = datetime.utcnow()
if old_paired_address != address:
record_history(db, paired_unit.id, "address_change", "address",
old_paired_address or "", address or "", f"cascade from {unit_id}")
# Cascade coordinates
if cascade_coordinates in ['true', 'True', '1', 'yes']:
old_paired_coords = paired_unit.coordinates
paired_unit.coordinates = coordinates
paired_unit.last_updated = datetime.utcnow()
if old_paired_coords != coordinates:
record_history(db, paired_unit.id, "coordinates_change", "coordinates",
old_paired_coords or "", coordinates or "", f"cascade from {unit_id}")
# Cascade note
if cascade_note in ['true', 'True', '1', 'yes']:
old_paired_note = paired_unit.note
paired_unit.note = note
paired_unit.last_updated = datetime.utcnow()
if old_paired_note != note:
record_history(db, paired_unit.id, "note_change", "note",
old_paired_note or "", note or "", f"cascade from {unit_id}")
db.commit()
return {"message": "Unit updated", "id": unit_id, "device_type": device_type}
response = {"message": "Unit updated", "id": unit_id, "device_type": device_type}
if cascaded_unit_id:
response["cascaded_to"] = cascaded_unit_id
return response
@router.post("/set-deployed/{unit_id}")
@@ -624,6 +885,40 @@ async def import_csv(
filtered_lines = [line for line in lines if not line.strip().startswith('#')]
csv_text = '\n'.join(filtered_lines)
# First pass: validate for duplicates and empty unit_ids
csv_reader = csv.DictReader(io.StringIO(csv_text))
seen_unit_ids = {} # unit_id -> list of row numbers
empty_unit_id_rows = []
for row_num, row in enumerate(csv_reader, start=2):
unit_id = row.get('unit_id', '').strip()
if not unit_id:
empty_unit_id_rows.append(row_num)
else:
if unit_id not in seen_unit_ids:
seen_unit_ids[unit_id] = []
seen_unit_ids[unit_id].append(row_num)
# Check for validation errors
validation_errors = []
# Report empty unit_ids
if empty_unit_id_rows:
validation_errors.append(f"Empty unit_id on row(s): {', '.join(map(str, empty_unit_id_rows))}")
# Report duplicates
duplicates = {uid: rows for uid, rows in seen_unit_ids.items() if len(rows) > 1}
if duplicates:
for uid, rows in duplicates.items():
validation_errors.append(f"Duplicate unit_id '{uid}' on rows: {', '.join(map(str, rows))}")
if validation_errors:
raise HTTPException(
status_code=400,
detail="CSV validation failed:\n" + "\n".join(validation_errors)
)
# Second pass: actually import the data
csv_reader = csv.DictReader(io.StringIO(csv_text))
results = {
@@ -682,6 +977,10 @@ async def import_csv(
existing_unit.phone_number = _get_csv_value(row, 'phone_number')
if row.get('hardware_model'):
existing_unit.hardware_model = _get_csv_value(row, 'hardware_model')
if row.get('deployment_type'):
existing_unit.deployment_type = _get_csv_value(row, 'deployment_type')
if row.get('deployed_with_unit_id'):
existing_unit.deployed_with_unit_id = _get_csv_value(row, 'deployed_with_unit_id')
# SLM-specific fields
if row.get('slm_host'):
@@ -724,6 +1023,8 @@ async def import_csv(
ip_address=_get_csv_value(row, 'ip_address'),
phone_number=_get_csv_value(row, 'phone_number'),
hardware_model=_get_csv_value(row, 'hardware_model'),
deployment_type=_get_csv_value(row, 'deployment_type'),
deployed_with_unit_id=_get_csv_value(row, 'deployed_with_unit_id'),
# SLM fields
slm_host=_get_csv_value(row, 'slm_host'),
slm_tcp_port=_parse_int(row.get('slm_tcp_port', '')),