BIG update: Update to 0.5.1. Added:

-Project management
-Modem Managerment
-Modem/unit pairing

and more
This commit is contained in:
serversdwn
2026-01-28 03:26:52 +00:00
parent 38c600aca3
commit 44d7841852
2 changed files with 166 additions and 29 deletions

View File

@@ -531,6 +531,37 @@ def set_note(unit_id: str, note: str = Form(""), db: Session = Depends(get_db)):
return {"message": "Updated", "id": unit_id, "note": note}
def _parse_bool(value: str) -> bool:
"""Parse boolean from CSV string value."""
return value.lower() in ('true', '1', 'yes') if value else False
def _parse_int(value: str) -> int | None:
"""Parse integer from CSV string value, return None if empty or invalid."""
if not value or not value.strip():
return None
try:
return int(value.strip())
except ValueError:
return None
def _parse_date(value: str) -> date | None:
"""Parse date from CSV string value (YYYY-MM-DD format)."""
if not value or not value.strip():
return None
try:
return datetime.strptime(value.strip(), '%Y-%m-%d').date()
except ValueError:
return None
def _get_csv_value(row: dict, key: str, default=None):
"""Get value from CSV row, return default if empty."""
value = row.get(key, '').strip() if row.get(key) else ''
return value if value else default
@router.post("/import-csv")
async def import_csv(
file: UploadFile = File(...),
@@ -541,13 +572,40 @@ async def import_csv(
Import roster units from CSV file.
Expected CSV columns (unit_id is required, others are optional):
- unit_id: Unique identifier for the unit
- unit_type: Type of unit (default: "series3")
- deployed: Boolean for deployment status (default: False)
- retired: Boolean for retirement status (default: False)
Common fields (all device types):
- unit_id: Unique identifier for the unit (REQUIRED)
- device_type: "seismograph", "modem", or "slm" (default: "seismograph")
- unit_type: Sub-type (e.g., "series3", "series4" for seismographs)
- deployed: Boolean (true/false/yes/no/1/0)
- retired: Boolean
- note: Notes about the unit
- project_id: Project identifier
- location: Location description
- address: Street address
- coordinates: GPS coordinates (lat;lon or lat,lon)
Seismograph-specific:
- last_calibrated: Date (YYYY-MM-DD)
- next_calibration_due: Date (YYYY-MM-DD)
- deployed_with_modem_id: ID of paired modem
Modem-specific:
- ip_address: Device IP address
- phone_number: SIM card phone number
- hardware_model: Hardware model (e.g., IBR900, RV55)
SLM-specific:
- slm_host: Device IP or hostname
- slm_tcp_port: TCP control port (default 2255)
- slm_ftp_port: FTP port (default 21)
- slm_model: Device model (NL-43, NL-53)
- slm_serial_number: Serial number
- slm_frequency_weighting: A, C, or Z
- slm_time_weighting: F (Fast), S (Slow), I (Impulse)
- slm_measurement_range: e.g., "30-130 dB"
Lines starting with # are treated as comments and skipped.
Args:
file: CSV file upload
@@ -560,6 +618,12 @@ async def import_csv(
# Read file content
contents = await file.read()
csv_text = contents.decode('utf-8')
# Filter out comment lines (starting with #)
lines = csv_text.split('\n')
filtered_lines = [line for line in lines if not line.strip().startswith('#')]
csv_text = '\n'.join(filtered_lines)
csv_reader = csv.DictReader(io.StringIO(csv_text))
results = {
@@ -580,6 +644,9 @@ async def import_csv(
})
continue
# Determine device type
device_type = _get_csv_value(row, 'device_type', 'seismograph')
# Check if unit exists
existing_unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
@@ -588,31 +655,84 @@ async def import_csv(
results["skipped"].append(unit_id)
continue
# Update existing unit
existing_unit.unit_type = row.get('unit_type', existing_unit.unit_type or 'series3')
existing_unit.deployed = row.get('deployed', '').lower() in ('true', '1', 'yes') if row.get('deployed') else existing_unit.deployed
existing_unit.retired = row.get('retired', '').lower() in ('true', '1', 'yes') if row.get('retired') else existing_unit.retired
existing_unit.note = row.get('note', existing_unit.note or '')
existing_unit.project_id = row.get('project_id', existing_unit.project_id)
existing_unit.location = row.get('location', existing_unit.location)
existing_unit.address = row.get('address', existing_unit.address)
existing_unit.coordinates = row.get('coordinates', existing_unit.coordinates)
# Update existing unit - common fields
existing_unit.device_type = device_type
existing_unit.unit_type = _get_csv_value(row, 'unit_type', existing_unit.unit_type or 'series3')
existing_unit.deployed = _parse_bool(row.get('deployed', '')) if row.get('deployed') else existing_unit.deployed
existing_unit.retired = _parse_bool(row.get('retired', '')) if row.get('retired') else existing_unit.retired
existing_unit.note = _get_csv_value(row, 'note', existing_unit.note)
existing_unit.project_id = _get_csv_value(row, 'project_id', existing_unit.project_id)
existing_unit.location = _get_csv_value(row, 'location', existing_unit.location)
existing_unit.address = _get_csv_value(row, 'address', existing_unit.address)
existing_unit.coordinates = _get_csv_value(row, 'coordinates', existing_unit.coordinates)
existing_unit.last_updated = datetime.utcnow()
# Seismograph-specific fields
if row.get('last_calibrated'):
existing_unit.last_calibrated = _parse_date(row.get('last_calibrated'))
if row.get('next_calibration_due'):
existing_unit.next_calibration_due = _parse_date(row.get('next_calibration_due'))
if row.get('deployed_with_modem_id'):
existing_unit.deployed_with_modem_id = _get_csv_value(row, 'deployed_with_modem_id')
# Modem-specific fields
if row.get('ip_address'):
existing_unit.ip_address = _get_csv_value(row, 'ip_address')
if row.get('phone_number'):
existing_unit.phone_number = _get_csv_value(row, 'phone_number')
if row.get('hardware_model'):
existing_unit.hardware_model = _get_csv_value(row, 'hardware_model')
# SLM-specific fields
if row.get('slm_host'):
existing_unit.slm_host = _get_csv_value(row, 'slm_host')
if row.get('slm_tcp_port'):
existing_unit.slm_tcp_port = _parse_int(row.get('slm_tcp_port'))
if row.get('slm_ftp_port'):
existing_unit.slm_ftp_port = _parse_int(row.get('slm_ftp_port'))
if row.get('slm_model'):
existing_unit.slm_model = _get_csv_value(row, 'slm_model')
if row.get('slm_serial_number'):
existing_unit.slm_serial_number = _get_csv_value(row, 'slm_serial_number')
if row.get('slm_frequency_weighting'):
existing_unit.slm_frequency_weighting = _get_csv_value(row, 'slm_frequency_weighting')
if row.get('slm_time_weighting'):
existing_unit.slm_time_weighting = _get_csv_value(row, 'slm_time_weighting')
if row.get('slm_measurement_range'):
existing_unit.slm_measurement_range = _get_csv_value(row, 'slm_measurement_range')
results["updated"].append(unit_id)
else:
# Create new unit
# Create new unit with all fields
new_unit = RosterUnit(
id=unit_id,
unit_type=row.get('unit_type', 'series3'),
deployed=row.get('deployed', '').lower() in ('true', '1', 'yes'),
retired=row.get('retired', '').lower() in ('true', '1', 'yes'),
note=row.get('note', ''),
project_id=row.get('project_id'),
location=row.get('location'),
address=row.get('address'),
coordinates=row.get('coordinates'),
last_updated=datetime.utcnow()
device_type=device_type,
unit_type=_get_csv_value(row, 'unit_type', 'series3'),
deployed=_parse_bool(row.get('deployed', '')),
retired=_parse_bool(row.get('retired', '')),
note=_get_csv_value(row, 'note', ''),
project_id=_get_csv_value(row, 'project_id'),
location=_get_csv_value(row, 'location'),
address=_get_csv_value(row, 'address'),
coordinates=_get_csv_value(row, 'coordinates'),
last_updated=datetime.utcnow(),
# Seismograph fields
last_calibrated=_parse_date(row.get('last_calibrated', '')),
next_calibration_due=_parse_date(row.get('next_calibration_due', '')),
deployed_with_modem_id=_get_csv_value(row, 'deployed_with_modem_id'),
# Modem fields
ip_address=_get_csv_value(row, 'ip_address'),
phone_number=_get_csv_value(row, 'phone_number'),
hardware_model=_get_csv_value(row, 'hardware_model'),
# SLM fields
slm_host=_get_csv_value(row, 'slm_host'),
slm_tcp_port=_parse_int(row.get('slm_tcp_port', '')),
slm_ftp_port=_parse_int(row.get('slm_ftp_port', '')),
slm_model=_get_csv_value(row, 'slm_model'),
slm_serial_number=_get_csv_value(row, 'slm_serial_number'),
slm_frequency_weighting=_get_csv_value(row, 'slm_frequency_weighting'),
slm_time_weighting=_get_csv_value(row, 'slm_time_weighting'),
slm_measurement_range=_get_csv_value(row, 'slm_measurement_range'),
)
db.add(new_unit)
results["added"].append(unit_id)