Files
terra-view/backend/routers/roster_edit.py

405 lines
14 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Form, UploadFile, File
from sqlalchemy.orm import Session
from datetime import datetime, date
import csv
import io
from backend.database import get_db
from backend.models import RosterUnit, IgnoredUnit, Emitter
router = APIRouter(prefix="/api/roster", tags=["roster-edit"])
def get_or_create_roster_unit(db: Session, unit_id: str):
unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
if not unit:
unit = RosterUnit(id=unit_id)
db.add(unit)
db.commit()
db.refresh(unit)
return unit
@router.post("/add")
def add_roster_unit(
id: str = Form(...),
device_type: str = Form("seismograph"),
unit_type: str = Form("series3"),
deployed: bool = Form(False),
retired: bool = Form(False),
note: str = Form(""),
project_id: str = Form(None),
location: str = Form(None),
address: str = Form(None),
coordinates: str = Form(None),
# Seismograph-specific fields
last_calibrated: str = Form(None),
next_calibration_due: str = Form(None),
deployed_with_modem_id: str = Form(None),
# Modem-specific fields
ip_address: str = Form(None),
phone_number: str = Form(None),
hardware_model: str = Form(None),
db: Session = Depends(get_db)
):
if db.query(RosterUnit).filter(RosterUnit.id == id).first():
raise HTTPException(status_code=400, detail="Unit already exists")
# Parse date fields if provided
last_cal_date = None
if last_calibrated:
try:
last_cal_date = datetime.strptime(last_calibrated, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid last_calibrated date format. Use YYYY-MM-DD")
next_cal_date = None
if next_calibration_due:
try:
next_cal_date = datetime.strptime(next_calibration_due, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid next_calibration_due date format. Use YYYY-MM-DD")
unit = RosterUnit(
id=id,
device_type=device_type,
unit_type=unit_type,
deployed=deployed,
retired=retired,
note=note,
project_id=project_id,
location=location,
address=address,
coordinates=coordinates,
last_updated=datetime.utcnow(),
# Seismograph-specific fields
last_calibrated=last_cal_date,
next_calibration_due=next_cal_date,
deployed_with_modem_id=deployed_with_modem_id if deployed_with_modem_id else None,
# Modem-specific fields
ip_address=ip_address if ip_address else None,
phone_number=phone_number if phone_number else None,
hardware_model=hardware_model if hardware_model else None,
)
db.add(unit)
db.commit()
return {"message": "Unit added", "id": id, "device_type": device_type}
@router.get("/{unit_id}")
def get_roster_unit(unit_id: str, db: Session = Depends(get_db)):
"""Get a single roster unit by ID"""
unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
return {
"id": unit.id,
"device_type": unit.device_type or "seismograph",
"unit_type": unit.unit_type,
"deployed": unit.deployed,
"retired": unit.retired,
"note": unit.note or "",
"project_id": unit.project_id or "",
"location": unit.location or "",
"address": unit.address or "",
"coordinates": unit.coordinates or "",
"last_calibrated": unit.last_calibrated.isoformat() if unit.last_calibrated else "",
"next_calibration_due": unit.next_calibration_due.isoformat() if unit.next_calibration_due else "",
"deployed_with_modem_id": unit.deployed_with_modem_id or "",
"ip_address": unit.ip_address or "",
"phone_number": unit.phone_number or "",
"hardware_model": unit.hardware_model or "",
}
@router.post("/edit/{unit_id}")
def edit_roster_unit(
unit_id: str,
device_type: str = Form("seismograph"),
unit_type: str = Form("series3"),
deployed: bool = Form(False),
retired: bool = Form(False),
note: str = Form(""),
project_id: str = Form(None),
location: str = Form(None),
address: str = Form(None),
coordinates: str = Form(None),
# Seismograph-specific fields
last_calibrated: str = Form(None),
next_calibration_due: str = Form(None),
deployed_with_modem_id: str = Form(None),
# Modem-specific fields
ip_address: str = Form(None),
phone_number: str = Form(None),
hardware_model: str = Form(None),
db: Session = Depends(get_db)
):
unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
if not unit:
raise HTTPException(status_code=404, detail="Unit not found")
# Parse date fields if provided
last_cal_date = None
if last_calibrated:
try:
last_cal_date = datetime.strptime(last_calibrated, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid last_calibrated date format. Use YYYY-MM-DD")
next_cal_date = None
if next_calibration_due:
try:
next_cal_date = datetime.strptime(next_calibration_due, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid next_calibration_due date format. Use YYYY-MM-DD")
# Update all fields
unit.device_type = device_type
unit.unit_type = unit_type
unit.deployed = deployed
unit.retired = retired
unit.note = note
unit.project_id = project_id
unit.location = location
unit.address = address
unit.coordinates = coordinates
unit.last_updated = datetime.utcnow()
# Seismograph-specific fields
unit.last_calibrated = last_cal_date
unit.next_calibration_due = next_cal_date
unit.deployed_with_modem_id = deployed_with_modem_id if deployed_with_modem_id else None
# Modem-specific fields
unit.ip_address = ip_address if ip_address else None
unit.phone_number = phone_number if phone_number else None
unit.hardware_model = hardware_model if hardware_model else None
db.commit()
return {"message": "Unit updated", "id": unit_id, "device_type": device_type}
@router.post("/set-deployed/{unit_id}")
def set_deployed(unit_id: str, deployed: bool = Form(...), db: Session = Depends(get_db)):
unit = get_or_create_roster_unit(db, unit_id)
unit.deployed = deployed
unit.last_updated = datetime.utcnow()
db.commit()
return {"message": "Updated", "id": unit_id, "deployed": deployed}
@router.post("/set-retired/{unit_id}")
def set_retired(unit_id: str, retired: bool = Form(...), db: Session = Depends(get_db)):
unit = get_or_create_roster_unit(db, unit_id)
unit.retired = retired
unit.last_updated = datetime.utcnow()
db.commit()
return {"message": "Updated", "id": unit_id, "retired": retired}
@router.delete("/{unit_id}")
def delete_roster_unit(unit_id: str, db: Session = Depends(get_db)):
"""
Permanently delete a unit from the database.
Checks roster, emitters, and ignored_units tables and deletes from any table where the unit exists.
"""
deleted = False
# Try to delete from roster table
roster_unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
if roster_unit:
db.delete(roster_unit)
deleted = True
# Try to delete from emitters table
emitter = db.query(Emitter).filter(Emitter.id == unit_id).first()
if emitter:
db.delete(emitter)
deleted = True
# Try to delete from ignored_units table
ignored_unit = db.query(IgnoredUnit).filter(IgnoredUnit.id == unit_id).first()
if ignored_unit:
db.delete(ignored_unit)
deleted = True
# If not found in any table, return error
if not deleted:
raise HTTPException(status_code=404, detail="Unit not found")
db.commit()
return {"message": "Unit deleted", "id": unit_id}
@router.post("/set-note/{unit_id}")
def set_note(unit_id: str, note: str = Form(""), db: Session = Depends(get_db)):
unit = get_or_create_roster_unit(db, unit_id)
unit.note = note
unit.last_updated = datetime.utcnow()
db.commit()
return {"message": "Updated", "id": unit_id, "note": note}
@router.post("/import-csv")
async def import_csv(
file: UploadFile = File(...),
update_existing: bool = Form(True),
db: Session = Depends(get_db)
):
"""
Import roster units from CSV file.
Expected CSV columns (unit_id is required, others are optional):
- unit_id: Unique identifier for the unit
- unit_type: Type of unit (default: "series3")
- deployed: Boolean for deployment status (default: False)
- retired: Boolean for retirement status (default: False)
- note: Notes about the unit
- project_id: Project identifier
- location: Location description
Args:
file: CSV file upload
update_existing: If True, update existing units; if False, skip them
"""
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV")
# Read file content
contents = await file.read()
csv_text = contents.decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_text))
results = {
"added": [],
"updated": [],
"skipped": [],
"errors": []
}
for row_num, row in enumerate(csv_reader, start=2): # Start at 2 to account for header
try:
# Validate required field
unit_id = row.get('unit_id', '').strip()
if not unit_id:
results["errors"].append({
"row": row_num,
"error": "Missing required field: unit_id"
})
continue
# Check if unit exists
existing_unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first()
if existing_unit:
if not update_existing:
results["skipped"].append(unit_id)
continue
# Update existing unit
existing_unit.unit_type = row.get('unit_type', existing_unit.unit_type or 'series3')
existing_unit.deployed = row.get('deployed', '').lower() in ('true', '1', 'yes') if row.get('deployed') else existing_unit.deployed
existing_unit.retired = row.get('retired', '').lower() in ('true', '1', 'yes') if row.get('retired') else existing_unit.retired
existing_unit.note = row.get('note', existing_unit.note or '')
existing_unit.project_id = row.get('project_id', existing_unit.project_id)
existing_unit.location = row.get('location', existing_unit.location)
existing_unit.address = row.get('address', existing_unit.address)
existing_unit.coordinates = row.get('coordinates', existing_unit.coordinates)
existing_unit.last_updated = datetime.utcnow()
results["updated"].append(unit_id)
else:
# Create new unit
new_unit = RosterUnit(
id=unit_id,
unit_type=row.get('unit_type', 'series3'),
deployed=row.get('deployed', '').lower() in ('true', '1', 'yes'),
retired=row.get('retired', '').lower() in ('true', '1', 'yes'),
note=row.get('note', ''),
project_id=row.get('project_id'),
location=row.get('location'),
address=row.get('address'),
coordinates=row.get('coordinates'),
last_updated=datetime.utcnow()
)
db.add(new_unit)
results["added"].append(unit_id)
except Exception as e:
results["errors"].append({
"row": row_num,
"unit_id": row.get('unit_id', 'unknown'),
"error": str(e)
})
# Commit all changes
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
return {
"message": "CSV import completed",
"summary": {
"added": len(results["added"]),
"updated": len(results["updated"]),
"skipped": len(results["skipped"]),
"errors": len(results["errors"])
},
"details": results
}
@router.post("/ignore/{unit_id}")
def ignore_unit(unit_id: str, reason: str = Form(""), db: Session = Depends(get_db)):
"""
Add a unit to the ignore list to suppress it from unknown emitters.
"""
# Check if already ignored
if db.query(IgnoredUnit).filter(IgnoredUnit.id == unit_id).first():
raise HTTPException(status_code=400, detail="Unit already ignored")
ignored = IgnoredUnit(
id=unit_id,
reason=reason,
ignored_at=datetime.utcnow()
)
db.add(ignored)
db.commit()
return {"message": "Unit ignored", "id": unit_id}
@router.delete("/ignore/{unit_id}")
def unignore_unit(unit_id: str, db: Session = Depends(get_db)):
"""
Remove a unit from the ignore list.
"""
ignored = db.query(IgnoredUnit).filter(IgnoredUnit.id == unit_id).first()
if not ignored:
raise HTTPException(status_code=404, detail="Unit not in ignore list")
db.delete(ignored)
db.commit()
return {"message": "Unit unignored", "id": unit_id}
@router.get("/ignored")
def list_ignored_units(db: Session = Depends(get_db)):
"""
Get list of all ignored units.
"""
ignored_units = db.query(IgnoredUnit).all()
return {
"ignored": [
{
"id": unit.id,
"reason": unit.reason,
"ignored_at": unit.ignored_at.isoformat()
}
for unit in ignored_units
]
}