Change: user sets date of previous calibration, not upcoming expire dates.

- seismograph list page enhanced with better visabilty, filtering, sorting, and calibration dates color coded.
This commit is contained in:
serversdwn
2026-02-06 21:17:14 +00:00
parent eb0a99796d
commit 89662d2fa5
8 changed files with 408 additions and 86 deletions

View File

@@ -9,12 +9,20 @@ import httpx
import os
from backend.database import get_db
from backend.models import RosterUnit, IgnoredUnit, Emitter, UnitHistory
from backend.models import RosterUnit, IgnoredUnit, Emitter, UnitHistory, UserPreferences
from backend.services.slmm_sync import sync_slm_to_slmm
router = APIRouter(prefix="/api/roster", tags=["roster-edit"])
logger = logging.getLogger(__name__)
def get_calibration_interval(db: Session) -> int:
"""Get calibration interval from user preferences, default 365 days."""
prefs = db.query(UserPreferences).first()
if prefs and prefs.calibration_interval_days:
return prefs.calibration_interval_days
return 365
# SLMM backend URL for syncing device configs to cache
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
@@ -185,11 +193,11 @@ async def add_roster_unit(
except ValueError:
raise HTTPException(status_code=400, detail="Invalid last_calibrated date format. Use YYYY-MM-DD")
# Auto-calculate next_calibration_due (1 year from last_calibrated)
# This is calculated internally but not shown to user (they just see last_calibrated)
# Auto-calculate next_calibration_due from last_calibrated using calibration interval
next_cal_date = None
if last_cal_date:
next_cal_date = last_cal_date + timedelta(days=365)
cal_interval = get_calibration_interval(db)
next_cal_date = last_cal_date + timedelta(days=cal_interval)
elif next_calibration_due:
# Fallback: allow explicit setting if no last_calibrated
try:
@@ -522,11 +530,11 @@ async def edit_roster_unit(
except ValueError:
raise HTTPException(status_code=400, detail="Invalid last_calibrated date format. Use YYYY-MM-DD")
# Auto-calculate next_calibration_due (1 year from last_calibrated)
# This is calculated internally but not shown to user (they just see last_calibrated)
# Auto-calculate next_calibration_due from last_calibrated using calibration interval
next_cal_date = None
if last_cal_date:
next_cal_date = last_cal_date + timedelta(days=365)
cal_interval = get_calibration_interval(db)
next_cal_date = last_cal_date + timedelta(days=cal_interval)
elif next_calibration_due:
# Fallback: allow explicit setting if no last_calibrated
try:
@@ -1007,9 +1015,10 @@ async def import_csv(
if row.get('last_calibrated'):
last_cal = _parse_date(row.get('last_calibrated'))
existing_unit.last_calibrated = last_cal
# Auto-calculate next_calibration_due (1 year from last_calibrated)
# Auto-calculate next_calibration_due using calibration interval
if last_cal:
existing_unit.next_calibration_due = last_cal + timedelta(days=365)
cal_interval = get_calibration_interval(db)
existing_unit.next_calibration_due = last_cal + timedelta(days=cal_interval)
elif row.get('next_calibration_due'):
# Only use explicit next_calibration_due if no last_calibrated
existing_unit.next_calibration_due = _parse_date(row.get('next_calibration_due'))
@@ -1048,6 +1057,14 @@ async def import_csv(
results["updated"].append(unit_id)
else:
# Calculate next_calibration_due from last_calibrated
last_cal = _parse_date(row.get('last_calibrated', ''))
if last_cal:
cal_interval = get_calibration_interval(db)
next_cal = last_cal + timedelta(days=cal_interval)
else:
next_cal = _parse_date(row.get('next_calibration_due', ''))
# Create new unit with all fields
new_unit = RosterUnit(
id=unit_id,
@@ -1062,12 +1079,8 @@ async def import_csv(
coordinates=_get_csv_value(row, 'coordinates'),
last_updated=datetime.utcnow(),
# Seismograph fields - auto-calc next_calibration_due from last_calibrated
last_calibrated=_parse_date(row.get('last_calibrated', '')),
next_calibration_due=(
_parse_date(row.get('last_calibrated', '')) + timedelta(days=365)
if _parse_date(row.get('last_calibrated', ''))
else _parse_date(row.get('next_calibration_due', ''))
),
last_calibrated=last_cal,
next_calibration_due=next_cal,
deployed_with_modem_id=_get_csv_value(row, 'deployed_with_modem_id'),
# Modem fields
ip_address=_get_csv_value(row, 'ip_address'),

View File

@@ -3,6 +3,8 @@ Seismograph Dashboard API Router
Provides endpoints for the seismograph-specific dashboard
"""
from datetime import date
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
@@ -49,10 +51,14 @@ async def get_seismo_stats(request: Request, db: Session = Depends(get_db)):
async def get_seismo_units(
request: Request,
db: Session = Depends(get_db),
search: str = Query(None)
search: str = Query(None),
sort: str = Query("id"),
order: str = Query("asc"),
status: str = Query(None),
modem: str = Query(None)
):
"""
Returns HTML partial with filterable seismograph unit list
Returns HTML partial with filterable and sortable seismograph unit list
"""
query = db.query(RosterUnit).filter_by(
device_type="seismograph",
@@ -61,20 +67,52 @@ async def get_seismo_units(
# Apply search filter
if search:
search_lower = search.lower()
query = query.filter(
(RosterUnit.id.ilike(f"%{search}%")) |
(RosterUnit.note.ilike(f"%{search}%")) |
(RosterUnit.address.ilike(f"%{search}%"))
)
seismos = query.order_by(RosterUnit.id).all()
# Apply status filter
if status == "deployed":
query = query.filter(RosterUnit.deployed == True)
elif status == "benched":
query = query.filter(RosterUnit.deployed == False)
# Apply modem filter
if modem == "with":
query = query.filter(RosterUnit.deployed_with_modem_id.isnot(None))
elif modem == "without":
query = query.filter(RosterUnit.deployed_with_modem_id.is_(None))
# Apply sorting
sort_column_map = {
"id": RosterUnit.id,
"status": RosterUnit.deployed,
"modem": RosterUnit.deployed_with_modem_id,
"location": RosterUnit.address,
"last_calibrated": RosterUnit.last_calibrated,
"notes": RosterUnit.note
}
sort_column = sort_column_map.get(sort, RosterUnit.id)
if order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
seismos = query.all()
return templates.TemplateResponse(
"partials/seismo_unit_list.html",
{
"request": request,
"units": seismos,
"search": search or ""
"search": search or "",
"sort": sort,
"order": order,
"status": status or "",
"modem": modem or "",
"today": date.today()
}
)