726 lines
25 KiB
Python
726 lines
25 KiB
Python
"""
|
|
Fleet Calendar Service
|
|
|
|
Business logic for:
|
|
- Calculating unit availability on any given date
|
|
- Calibration status tracking (valid, expiring soon, expired)
|
|
- Job reservation management
|
|
- Conflict detection (calibration expires mid-job)
|
|
"""
|
|
|
|
from datetime import date, datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_, or_
|
|
|
|
from backend.models import (
|
|
RosterUnit, JobReservation, JobReservationUnit,
|
|
UserPreferences, Project, DeploymentRecord
|
|
)
|
|
|
|
|
|
def get_calibration_status(
|
|
unit: RosterUnit,
|
|
check_date: date,
|
|
warning_days: int = 30
|
|
) -> str:
|
|
"""
|
|
Determine calibration status for a unit on a specific date.
|
|
|
|
Returns:
|
|
"valid" - Calibration is good on this date
|
|
"expiring_soon" - Within warning_days of expiry
|
|
"expired" - Calibration has expired
|
|
"needs_calibration" - No calibration date set
|
|
"""
|
|
if not unit.last_calibrated:
|
|
return "needs_calibration"
|
|
|
|
# Calculate expiry date (1 year from last calibration)
|
|
expiry_date = unit.last_calibrated + timedelta(days=365)
|
|
|
|
if check_date >= expiry_date:
|
|
return "expired"
|
|
elif check_date >= expiry_date - timedelta(days=warning_days):
|
|
return "expiring_soon"
|
|
else:
|
|
return "valid"
|
|
|
|
|
|
def get_unit_reservations_on_date(
|
|
db: Session,
|
|
unit_id: str,
|
|
check_date: date
|
|
) -> List[JobReservation]:
|
|
"""Get all reservations that include this unit on the given date."""
|
|
|
|
# Get reservation IDs that have this unit assigned
|
|
assigned_reservation_ids = db.query(JobReservationUnit.reservation_id).filter(
|
|
JobReservationUnit.unit_id == unit_id
|
|
).subquery()
|
|
|
|
# Get reservations that:
|
|
# 1. Have this unit assigned AND date is within range
|
|
reservations = db.query(JobReservation).filter(
|
|
JobReservation.id.in_(assigned_reservation_ids),
|
|
JobReservation.start_date <= check_date,
|
|
JobReservation.end_date >= check_date
|
|
).all()
|
|
|
|
return reservations
|
|
|
|
|
|
def get_active_deployment(db: Session, unit_id: str) -> Optional[DeploymentRecord]:
|
|
"""Return the active (unreturned) deployment record for a unit, or None."""
|
|
return (
|
|
db.query(DeploymentRecord)
|
|
.filter(
|
|
DeploymentRecord.unit_id == unit_id,
|
|
DeploymentRecord.actual_removal_date == None
|
|
)
|
|
.order_by(DeploymentRecord.created_at.desc())
|
|
.first()
|
|
)
|
|
|
|
|
|
def is_unit_available_on_date(
|
|
db: Session,
|
|
unit: RosterUnit,
|
|
check_date: date,
|
|
warning_days: int = 30
|
|
) -> Tuple[bool, str, Optional[str]]:
|
|
"""
|
|
Check if a unit is available on a specific date.
|
|
|
|
Returns:
|
|
(is_available, status, reservation_name)
|
|
- is_available: True if unit can be assigned to new work
|
|
- status: "available", "reserved", "expired", "retired", "needs_calibration", "in_field"
|
|
- reservation_name: Name of blocking reservation or project ref (if any)
|
|
"""
|
|
# Check if retired
|
|
if unit.retired:
|
|
return False, "retired", None
|
|
|
|
# Check calibration status
|
|
cal_status = get_calibration_status(unit, check_date, warning_days)
|
|
if cal_status == "expired":
|
|
return False, "expired", None
|
|
if cal_status == "needs_calibration":
|
|
return False, "needs_calibration", None
|
|
|
|
# Check for an active deployment record (unit is physically in the field)
|
|
active_deployment = get_active_deployment(db, unit.id)
|
|
if active_deployment:
|
|
label = active_deployment.project_ref or "Field deployment"
|
|
return False, "in_field", label
|
|
|
|
# Check if already reserved
|
|
reservations = get_unit_reservations_on_date(db, unit.id, check_date)
|
|
if reservations:
|
|
return False, "reserved", reservations[0].name
|
|
|
|
# Unit is available (even if expiring soon - that's just a warning)
|
|
return True, "available", None
|
|
|
|
|
|
def get_day_summary(
|
|
db: Session,
|
|
check_date: date,
|
|
device_type: str = "seismograph"
|
|
) -> Dict:
|
|
"""
|
|
Get a complete summary of fleet status for a specific day.
|
|
|
|
Returns dict with:
|
|
- available_units: List of available unit IDs with calibration info
|
|
- reserved_units: List of reserved unit IDs with reservation info
|
|
- expired_units: List of units with expired calibration
|
|
- expiring_soon_units: List of units expiring within warning period
|
|
- reservations: List of active reservations on this date
|
|
- counts: Summary counts
|
|
"""
|
|
# Get user preferences for warning days
|
|
prefs = db.query(UserPreferences).filter_by(id=1).first()
|
|
warning_days = prefs.calibration_warning_days if prefs else 30
|
|
|
|
# Get all non-retired units of the specified device type
|
|
units = db.query(RosterUnit).filter(
|
|
RosterUnit.device_type == device_type,
|
|
RosterUnit.retired == False
|
|
).all()
|
|
|
|
available_units = []
|
|
reserved_units = []
|
|
expired_units = []
|
|
expiring_soon_units = []
|
|
needs_calibration_units = []
|
|
in_field_units = []
|
|
cal_expiring_today = [] # Units whose calibration expires ON this day
|
|
|
|
for unit in units:
|
|
is_avail, status, reservation_name = is_unit_available_on_date(
|
|
db, unit, check_date, warning_days
|
|
)
|
|
|
|
cal_status = get_calibration_status(unit, check_date, warning_days)
|
|
expiry_date = None
|
|
if unit.last_calibrated:
|
|
expiry_date = (unit.last_calibrated + timedelta(days=365)).isoformat()
|
|
|
|
unit_info = {
|
|
"id": unit.id,
|
|
"last_calibrated": unit.last_calibrated.isoformat() if unit.last_calibrated else None,
|
|
"expiry_date": expiry_date,
|
|
"calibration_status": cal_status,
|
|
"deployed": unit.deployed,
|
|
"note": unit.note or ""
|
|
}
|
|
|
|
# Check if calibration expires ON this specific day
|
|
if unit.last_calibrated:
|
|
unit_expiry_date = unit.last_calibrated + timedelta(days=365)
|
|
if unit_expiry_date == check_date:
|
|
cal_expiring_today.append(unit_info)
|
|
|
|
if status == "available":
|
|
available_units.append(unit_info)
|
|
if cal_status == "expiring_soon":
|
|
expiring_soon_units.append(unit_info)
|
|
elif status == "in_field":
|
|
unit_info["project_ref"] = reservation_name
|
|
in_field_units.append(unit_info)
|
|
elif status == "reserved":
|
|
unit_info["reservation_name"] = reservation_name
|
|
reserved_units.append(unit_info)
|
|
if cal_status == "expiring_soon":
|
|
expiring_soon_units.append(unit_info)
|
|
elif status == "expired":
|
|
expired_units.append(unit_info)
|
|
elif status == "needs_calibration":
|
|
needs_calibration_units.append(unit_info)
|
|
|
|
# Get active reservations on this date
|
|
reservations = db.query(JobReservation).filter(
|
|
JobReservation.device_type == device_type,
|
|
JobReservation.start_date <= check_date,
|
|
JobReservation.end_date >= check_date
|
|
).all()
|
|
|
|
reservation_list = []
|
|
for res in reservations:
|
|
# Count assigned units for this reservation
|
|
assigned_count = db.query(JobReservationUnit).filter(
|
|
JobReservationUnit.reservation_id == res.id
|
|
).count()
|
|
|
|
reservation_list.append({
|
|
"id": res.id,
|
|
"name": res.name,
|
|
"start_date": res.start_date.isoformat(),
|
|
"end_date": res.end_date.isoformat(),
|
|
"assignment_type": res.assignment_type,
|
|
"quantity_needed": res.quantity_needed,
|
|
"assigned_count": assigned_count,
|
|
"color": res.color,
|
|
"project_id": res.project_id
|
|
})
|
|
|
|
return {
|
|
"date": check_date.isoformat(),
|
|
"device_type": device_type,
|
|
"available_units": available_units,
|
|
"in_field_units": in_field_units,
|
|
"reserved_units": reserved_units,
|
|
"expired_units": expired_units,
|
|
"expiring_soon_units": expiring_soon_units,
|
|
"needs_calibration_units": needs_calibration_units,
|
|
"cal_expiring_today": cal_expiring_today,
|
|
"reservations": reservation_list,
|
|
"counts": {
|
|
"available": len(available_units),
|
|
"in_field": len(in_field_units),
|
|
"reserved": len(reserved_units),
|
|
"expired": len(expired_units),
|
|
"expiring_soon": len(expiring_soon_units),
|
|
"needs_calibration": len(needs_calibration_units),
|
|
"cal_expiring_today": len(cal_expiring_today),
|
|
"total": len(units)
|
|
}
|
|
}
|
|
|
|
|
|
def get_calendar_year_data(
|
|
db: Session,
|
|
year: int,
|
|
device_type: str = "seismograph"
|
|
) -> Dict:
|
|
"""
|
|
Get calendar data for an entire year.
|
|
|
|
For performance, this returns summary counts per day rather than
|
|
full unit lists. Use get_day_summary() for detailed day data.
|
|
"""
|
|
# Get user preferences
|
|
prefs = db.query(UserPreferences).filter_by(id=1).first()
|
|
warning_days = prefs.calibration_warning_days if prefs else 30
|
|
|
|
# Get all units
|
|
units = db.query(RosterUnit).filter(
|
|
RosterUnit.device_type == device_type,
|
|
RosterUnit.retired == False
|
|
).all()
|
|
|
|
# Get all reservations that overlap with this year
|
|
# Include TBD reservations (end_date is null) that started before year end
|
|
year_start = date(year, 1, 1)
|
|
year_end = date(year, 12, 31)
|
|
|
|
reservations = db.query(JobReservation).filter(
|
|
JobReservation.device_type == device_type,
|
|
JobReservation.start_date <= year_end,
|
|
or_(
|
|
JobReservation.end_date >= year_start,
|
|
JobReservation.end_date == None # TBD reservations
|
|
)
|
|
).all()
|
|
|
|
# Get all unit assignments for these reservations
|
|
reservation_ids = [r.id for r in reservations]
|
|
assignments = db.query(JobReservationUnit).filter(
|
|
JobReservationUnit.reservation_id.in_(reservation_ids)
|
|
).all() if reservation_ids else []
|
|
|
|
# Build a lookup: unit_id -> list of (start_date, end_date, reservation_name)
|
|
# For TBD reservations, use estimated_end_date if available, or a far future date
|
|
unit_reservations = {}
|
|
for res in reservations:
|
|
res_assignments = [a for a in assignments if a.reservation_id == res.id]
|
|
for assignment in res_assignments:
|
|
unit_id = assignment.unit_id
|
|
# Use unit-specific dates if set, otherwise use reservation dates
|
|
start_d = assignment.unit_start_date or res.start_date
|
|
if assignment.unit_end_tbd or (assignment.unit_end_date is None and res.end_date_tbd):
|
|
# TBD: use estimated date or far future for availability calculation
|
|
end_d = res.estimated_end_date or date(year + 5, 12, 31)
|
|
else:
|
|
end_d = assignment.unit_end_date or res.end_date or date(year + 5, 12, 31)
|
|
|
|
if unit_id not in unit_reservations:
|
|
unit_reservations[unit_id] = []
|
|
unit_reservations[unit_id].append((start_d, end_d, res.name))
|
|
|
|
# Build set of unit IDs that have an active deployment record (still in the field)
|
|
unit_ids = [u.id for u in units]
|
|
active_deployments = db.query(DeploymentRecord.unit_id).filter(
|
|
DeploymentRecord.unit_id.in_(unit_ids),
|
|
DeploymentRecord.actual_removal_date == None
|
|
).all()
|
|
unit_in_field = {row.unit_id for row in active_deployments}
|
|
|
|
# Generate data for each month
|
|
months_data = {}
|
|
|
|
for month in range(1, 13):
|
|
# Get first and last day of month
|
|
first_day = date(year, month, 1)
|
|
if month == 12:
|
|
last_day = date(year, 12, 31)
|
|
else:
|
|
last_day = date(year, month + 1, 1) - timedelta(days=1)
|
|
|
|
days_data = {}
|
|
current_day = first_day
|
|
|
|
while current_day <= last_day:
|
|
available = 0
|
|
in_field = 0
|
|
reserved = 0
|
|
expired = 0
|
|
expiring_soon = 0
|
|
needs_cal = 0
|
|
cal_expiring_on_day = 0 # Units whose calibration expires ON this day
|
|
cal_expired_on_day = 0 # Units whose calibration expired ON this day
|
|
|
|
for unit in units:
|
|
# Check calibration
|
|
cal_status = get_calibration_status(unit, current_day, warning_days)
|
|
|
|
# Check if calibration expires/expired ON this specific day
|
|
if unit.last_calibrated:
|
|
unit_expiry = unit.last_calibrated + timedelta(days=365)
|
|
if unit_expiry == current_day:
|
|
cal_expiring_on_day += 1
|
|
# Check if expired yesterday (first day of being expired)
|
|
elif unit_expiry == current_day - timedelta(days=1):
|
|
cal_expired_on_day += 1
|
|
|
|
if cal_status == "expired":
|
|
expired += 1
|
|
continue
|
|
if cal_status == "needs_calibration":
|
|
needs_cal += 1
|
|
continue
|
|
|
|
# Check active deployment record (in field)
|
|
if unit.id in unit_in_field:
|
|
in_field += 1
|
|
continue
|
|
|
|
# Check if reserved
|
|
is_reserved = False
|
|
if unit.id in unit_reservations:
|
|
for start_d, end_d, _ in unit_reservations[unit.id]:
|
|
if start_d <= current_day <= end_d:
|
|
is_reserved = True
|
|
break
|
|
|
|
if is_reserved:
|
|
reserved += 1
|
|
else:
|
|
available += 1
|
|
|
|
if cal_status == "expiring_soon":
|
|
expiring_soon += 1
|
|
|
|
days_data[current_day.day] = {
|
|
"available": available,
|
|
"in_field": in_field,
|
|
"reserved": reserved,
|
|
"expired": expired,
|
|
"expiring_soon": expiring_soon,
|
|
"needs_calibration": needs_cal,
|
|
"cal_expiring_on_day": cal_expiring_on_day,
|
|
"cal_expired_on_day": cal_expired_on_day
|
|
}
|
|
|
|
current_day += timedelta(days=1)
|
|
|
|
months_data[month] = {
|
|
"name": first_day.strftime("%B"),
|
|
"short_name": first_day.strftime("%b"),
|
|
"days": days_data,
|
|
"first_weekday": first_day.weekday(), # 0=Monday, 6=Sunday
|
|
"num_days": last_day.day
|
|
}
|
|
|
|
# Also include reservation summary for the year
|
|
reservation_list = []
|
|
for res in reservations:
|
|
assigned_count = len([a for a in assignments if a.reservation_id == res.id])
|
|
reservation_list.append({
|
|
"id": res.id,
|
|
"name": res.name,
|
|
"start_date": res.start_date.isoformat(),
|
|
"end_date": res.end_date.isoformat(),
|
|
"quantity_needed": res.quantity_needed,
|
|
"assigned_count": assigned_count,
|
|
"color": res.color
|
|
})
|
|
|
|
return {
|
|
"year": year,
|
|
"device_type": device_type,
|
|
"months": months_data,
|
|
"reservations": reservation_list,
|
|
"total_units": len(units)
|
|
}
|
|
|
|
|
|
def get_rolling_calendar_data(
|
|
db: Session,
|
|
start_year: int,
|
|
start_month: int,
|
|
device_type: str = "seismograph"
|
|
) -> Dict:
|
|
"""
|
|
Get calendar data for 12 months starting from a specific month/year.
|
|
|
|
This supports the rolling calendar view where users can scroll through
|
|
months one at a time, viewing any 12-month window.
|
|
"""
|
|
# Get user preferences
|
|
prefs = db.query(UserPreferences).filter_by(id=1).first()
|
|
warning_days = prefs.calibration_warning_days if prefs else 30
|
|
|
|
# Get all units
|
|
units = db.query(RosterUnit).filter(
|
|
RosterUnit.device_type == device_type,
|
|
RosterUnit.retired == False
|
|
).all()
|
|
|
|
# Calculate the date range for 12 months
|
|
first_date = date(start_year, start_month, 1)
|
|
# Calculate end date (12 months later)
|
|
end_year = start_year + 1 if start_month == 1 else start_year
|
|
end_month = 12 if start_month == 1 else start_month - 1
|
|
if start_month == 1:
|
|
end_year = start_year
|
|
end_month = 12
|
|
else:
|
|
# 12 months from start_month means we end at start_month - 1 next year
|
|
end_year = start_year + 1
|
|
end_month = start_month - 1
|
|
|
|
# Actually, simpler: go 11 months forward from start
|
|
end_year = start_year + ((start_month + 10) // 12)
|
|
end_month = ((start_month + 10) % 12) + 1
|
|
if end_month == 12:
|
|
last_date = date(end_year, 12, 31)
|
|
else:
|
|
last_date = date(end_year, end_month + 1, 1) - timedelta(days=1)
|
|
|
|
# Get all reservations that overlap with this 12-month range
|
|
reservations = db.query(JobReservation).filter(
|
|
JobReservation.device_type == device_type,
|
|
JobReservation.start_date <= last_date,
|
|
or_(
|
|
JobReservation.end_date >= first_date,
|
|
JobReservation.end_date == None # TBD reservations
|
|
)
|
|
).all()
|
|
|
|
# Get all unit assignments for these reservations
|
|
reservation_ids = [r.id for r in reservations]
|
|
assignments = db.query(JobReservationUnit).filter(
|
|
JobReservationUnit.reservation_id.in_(reservation_ids)
|
|
).all() if reservation_ids else []
|
|
|
|
# Build a lookup: unit_id -> list of (start_date, end_date, reservation_name)
|
|
unit_reservations = {}
|
|
for res in reservations:
|
|
res_assignments = [a for a in assignments if a.reservation_id == res.id]
|
|
for assignment in res_assignments:
|
|
unit_id = assignment.unit_id
|
|
start_d = assignment.unit_start_date or res.start_date
|
|
if assignment.unit_end_tbd or (assignment.unit_end_date is None and res.end_date_tbd):
|
|
end_d = res.estimated_end_date or date(start_year + 5, 12, 31)
|
|
else:
|
|
end_d = assignment.unit_end_date or res.end_date or date(start_year + 5, 12, 31)
|
|
|
|
if unit_id not in unit_reservations:
|
|
unit_reservations[unit_id] = []
|
|
unit_reservations[unit_id].append((start_d, end_d, res.name))
|
|
|
|
# Build set of unit IDs that have an active deployment record (still in the field)
|
|
unit_ids = [u.id for u in units]
|
|
active_deployments = db.query(DeploymentRecord.unit_id).filter(
|
|
DeploymentRecord.unit_id.in_(unit_ids),
|
|
DeploymentRecord.actual_removal_date == None
|
|
).all()
|
|
unit_in_field = {row.unit_id for row in active_deployments}
|
|
|
|
# Generate data for each of the 12 months
|
|
months_data = []
|
|
current_year = start_year
|
|
current_month = start_month
|
|
|
|
for i in range(12):
|
|
# Calculate this month's year and month
|
|
m_year = start_year + ((start_month - 1 + i) // 12)
|
|
m_month = ((start_month - 1 + i) % 12) + 1
|
|
|
|
first_day = date(m_year, m_month, 1)
|
|
if m_month == 12:
|
|
last_day = date(m_year, 12, 31)
|
|
else:
|
|
last_day = date(m_year, m_month + 1, 1) - timedelta(days=1)
|
|
|
|
days_data = {}
|
|
current_day = first_day
|
|
|
|
while current_day <= last_day:
|
|
available = 0
|
|
reserved = 0
|
|
expired = 0
|
|
expiring_soon = 0
|
|
needs_cal = 0
|
|
cal_expiring_on_day = 0
|
|
cal_expired_on_day = 0
|
|
|
|
for unit in units:
|
|
cal_status = get_calibration_status(unit, current_day, warning_days)
|
|
|
|
if unit.last_calibrated:
|
|
unit_expiry = unit.last_calibrated + timedelta(days=365)
|
|
if unit_expiry == current_day:
|
|
cal_expiring_on_day += 1
|
|
elif unit_expiry == current_day - timedelta(days=1):
|
|
cal_expired_on_day += 1
|
|
|
|
if cal_status == "expired":
|
|
expired += 1
|
|
continue
|
|
if cal_status == "needs_calibration":
|
|
needs_cal += 1
|
|
continue
|
|
|
|
is_reserved = False
|
|
if unit.id in unit_reservations:
|
|
for start_d, end_d, _ in unit_reservations[unit.id]:
|
|
if start_d <= current_day <= end_d:
|
|
is_reserved = True
|
|
break
|
|
|
|
if is_reserved:
|
|
reserved += 1
|
|
else:
|
|
available += 1
|
|
|
|
if cal_status == "expiring_soon":
|
|
expiring_soon += 1
|
|
|
|
days_data[current_day.day] = {
|
|
"available": available,
|
|
"reserved": reserved,
|
|
"expired": expired,
|
|
"expiring_soon": expiring_soon,
|
|
"needs_calibration": needs_cal,
|
|
"cal_expiring_on_day": cal_expiring_on_day,
|
|
"cal_expired_on_day": cal_expired_on_day
|
|
}
|
|
|
|
current_day += timedelta(days=1)
|
|
|
|
months_data.append({
|
|
"year": m_year,
|
|
"month": m_month,
|
|
"name": first_day.strftime("%B"),
|
|
"short_name": first_day.strftime("%b"),
|
|
"year_short": first_day.strftime("%y"),
|
|
"days": days_data,
|
|
"first_weekday": first_day.weekday(),
|
|
"num_days": last_day.day
|
|
})
|
|
|
|
return {
|
|
"start_year": start_year,
|
|
"start_month": start_month,
|
|
"device_type": device_type,
|
|
"months": months_data,
|
|
"total_units": len(units)
|
|
}
|
|
|
|
|
|
def check_calibration_conflicts(
|
|
db: Session,
|
|
reservation_id: str
|
|
) -> List[Dict]:
|
|
"""
|
|
Check if any units assigned to a reservation will have their
|
|
calibration expire during the reservation period.
|
|
|
|
Returns list of conflicts with unit info and expiry date.
|
|
"""
|
|
reservation = db.query(JobReservation).filter_by(id=reservation_id).first()
|
|
if not reservation:
|
|
return []
|
|
|
|
# Get assigned units
|
|
assigned = db.query(JobReservationUnit).filter_by(
|
|
reservation_id=reservation_id
|
|
).all()
|
|
|
|
conflicts = []
|
|
for assignment in assigned:
|
|
unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first()
|
|
if not unit or not unit.last_calibrated:
|
|
continue
|
|
|
|
expiry_date = unit.last_calibrated + timedelta(days=365)
|
|
|
|
# Check if expiry falls within reservation period
|
|
if reservation.start_date < expiry_date <= reservation.end_date:
|
|
conflicts.append({
|
|
"unit_id": unit.id,
|
|
"last_calibrated": unit.last_calibrated.isoformat(),
|
|
"expiry_date": expiry_date.isoformat(),
|
|
"reservation_name": reservation.name,
|
|
"days_into_job": (expiry_date - reservation.start_date).days
|
|
})
|
|
|
|
return conflicts
|
|
|
|
|
|
def get_available_units_for_period(
|
|
db: Session,
|
|
start_date: date,
|
|
end_date: date,
|
|
device_type: str = "seismograph",
|
|
exclude_reservation_id: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
Get units that are available for the entire specified period.
|
|
|
|
A unit is available if:
|
|
- Not retired
|
|
- Calibration is valid through the end date
|
|
- Not assigned to any other reservation that overlaps the period
|
|
"""
|
|
prefs = db.query(UserPreferences).filter_by(id=1).first()
|
|
warning_days = prefs.calibration_warning_days if prefs else 30
|
|
|
|
units = db.query(RosterUnit).filter(
|
|
RosterUnit.device_type == device_type,
|
|
RosterUnit.retired == False
|
|
).all()
|
|
|
|
# Get reservations that overlap with this period
|
|
overlapping_reservations = db.query(JobReservation).filter(
|
|
JobReservation.device_type == device_type,
|
|
JobReservation.start_date <= end_date,
|
|
JobReservation.end_date >= start_date
|
|
)
|
|
|
|
if exclude_reservation_id:
|
|
overlapping_reservations = overlapping_reservations.filter(
|
|
JobReservation.id != exclude_reservation_id
|
|
)
|
|
|
|
overlapping_reservations = overlapping_reservations.all()
|
|
|
|
# Get all units assigned to overlapping reservations
|
|
reserved_unit_ids = set()
|
|
for res in overlapping_reservations:
|
|
assigned = db.query(JobReservationUnit).filter_by(
|
|
reservation_id=res.id
|
|
).all()
|
|
for a in assigned:
|
|
reserved_unit_ids.add(a.unit_id)
|
|
|
|
# Get units with active deployment records (still in the field)
|
|
unit_ids = [u.id for u in units]
|
|
active_deps = db.query(DeploymentRecord.unit_id).filter(
|
|
DeploymentRecord.unit_id.in_(unit_ids),
|
|
DeploymentRecord.actual_removal_date == None
|
|
).all()
|
|
in_field_unit_ids = {row.unit_id for row in active_deps}
|
|
|
|
available_units = []
|
|
for unit in units:
|
|
# Check if already reserved
|
|
if unit.id in reserved_unit_ids:
|
|
continue
|
|
# Check if currently in the field
|
|
if unit.id in in_field_unit_ids:
|
|
continue
|
|
|
|
if unit.last_calibrated:
|
|
expiry_date = unit.last_calibrated + timedelta(days=365)
|
|
cal_status = get_calibration_status(unit, end_date, warning_days)
|
|
else:
|
|
expiry_date = None
|
|
cal_status = "needs_calibration"
|
|
|
|
available_units.append({
|
|
"id": unit.id,
|
|
"last_calibrated": unit.last_calibrated.isoformat() if unit.last_calibrated else None,
|
|
"expiry_date": expiry_date.isoformat() if expiry_date else None,
|
|
"calibration_status": cal_status,
|
|
"deployed": unit.deployed,
|
|
"out_for_calibration": unit.out_for_calibration or False,
|
|
"note": unit.note or ""
|
|
})
|
|
|
|
return available_units
|