from fastapi import APIRouter, HTTPException, UploadFile, File, Depends from fastapi.responses import FileResponse, JSONResponse from pathlib import Path from typing import List, Optional from datetime import datetime import os import shutil from PIL import Image from PIL.ExifTags import TAGS, GPSTAGS from sqlalchemy.orm import Session from backend.database import get_db from backend.models import RosterUnit router = APIRouter(prefix="/api", tags=["photos"]) PHOTOS_BASE_DIR = Path("data/photos") def extract_exif_data(image_path: Path) -> dict: """ Extract EXIF metadata from an image file. Returns dict with timestamp, GPS coordinates, and other metadata. """ try: image = Image.open(image_path) exif_data = image._getexif() if not exif_data: return {} metadata = {} # Extract standard EXIF tags for tag_id, value in exif_data.items(): tag = TAGS.get(tag_id, tag_id) # Extract datetime if tag == "DateTime" or tag == "DateTimeOriginal": try: metadata["timestamp"] = datetime.strptime(str(value), "%Y:%m:%d %H:%M:%S") except: pass # Extract GPS data if tag == "GPSInfo": gps_data = {} for gps_tag_id in value: gps_tag = GPSTAGS.get(gps_tag_id, gps_tag_id) gps_data[gps_tag] = value[gps_tag_id] # Convert GPS data to decimal degrees lat = gps_data.get("GPSLatitude") lat_ref = gps_data.get("GPSLatitudeRef") lon = gps_data.get("GPSLongitude") lon_ref = gps_data.get("GPSLongitudeRef") if lat and lon and lat_ref and lon_ref: # Convert to decimal degrees lat_decimal = convert_to_degrees(lat) if lat_ref == "S": lat_decimal = -lat_decimal lon_decimal = convert_to_degrees(lon) if lon_ref == "W": lon_decimal = -lon_decimal metadata["latitude"] = lat_decimal metadata["longitude"] = lon_decimal metadata["coordinates"] = f"{lat_decimal},{lon_decimal}" return metadata except Exception as e: print(f"Error extracting EXIF data: {e}") return {} def convert_to_degrees(value): """ Convert GPS coordinates from degrees/minutes/seconds to decimal degrees. """ d, m, s = value return float(d) + (float(m) / 60.0) + (float(s) / 3600.0) @router.post("/unit/{unit_id}/upload-photo") async def upload_photo( unit_id: str, photo: UploadFile = File(...), auto_populate_coords: bool = True, db: Session = Depends(get_db) ): """ Upload a photo for a unit and extract EXIF metadata. If GPS data exists and auto_populate_coords is True, update the unit's coordinates. """ # Validate file type allowed_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp"} file_ext = Path(photo.filename).suffix.lower() if file_ext not in allowed_extensions: raise HTTPException( status_code=400, detail=f"Invalid file type. Allowed: {', '.join(allowed_extensions)}" ) # Create photos directory for this unit unit_photo_dir = PHOTOS_BASE_DIR / unit_id unit_photo_dir.mkdir(parents=True, exist_ok=True) # Generate filename with timestamp to avoid collisions timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}_{photo.filename}" file_path = unit_photo_dir / filename # Save the file try: with open(file_path, "wb") as buffer: shutil.copyfileobj(photo.file, buffer) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save photo: {str(e)}") # Extract EXIF metadata metadata = extract_exif_data(file_path) # Update unit coordinates if GPS data exists and auto_populate_coords is True coordinates_updated = False if auto_populate_coords and "coordinates" in metadata: roster_unit = db.query(RosterUnit).filter(RosterUnit.id == unit_id).first() if roster_unit: roster_unit.coordinates = metadata["coordinates"] roster_unit.last_updated = datetime.utcnow() db.commit() coordinates_updated = True return JSONResponse(content={ "success": True, "filename": filename, "file_path": f"/api/unit/{unit_id}/photo/{filename}", "metadata": { "timestamp": metadata.get("timestamp").isoformat() if metadata.get("timestamp") else None, "latitude": metadata.get("latitude"), "longitude": metadata.get("longitude"), "coordinates": metadata.get("coordinates") }, "coordinates_updated": coordinates_updated }) @router.get("/unit/{unit_id}/photos") def get_unit_photos(unit_id: str): """ Reads /data/photos// and returns list of image filenames. Primary photo = most recent file. """ unit_photo_dir = PHOTOS_BASE_DIR / unit_id if not unit_photo_dir.exists(): # Return empty list if no photos directory exists return { "unit_id": unit_id, "photos": [], "primary_photo": None } # Get all image files image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp"} photos = [] for file_path in unit_photo_dir.iterdir(): if file_path.is_file() and file_path.suffix.lower() in image_extensions: photos.append({ "filename": file_path.name, "path": f"/api/unit/{unit_id}/photo/{file_path.name}", "modified": file_path.stat().st_mtime }) # Sort by modification time (most recent first) photos.sort(key=lambda x: x["modified"], reverse=True) # Primary photo is the most recent primary_photo = photos[0]["filename"] if photos else None return { "unit_id": unit_id, "photos": [p["filename"] for p in photos], "primary_photo": primary_photo, "photo_urls": [p["path"] for p in photos] } @router.get("/recent-photos") def get_recent_photos(limit: int = 12): """ Get the most recently uploaded photos across all units. Returns photos sorted by modification time (newest first). """ if not PHOTOS_BASE_DIR.exists(): return {"photos": []} all_photos = [] image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp"} # Scan all unit directories for unit_dir in PHOTOS_BASE_DIR.iterdir(): if not unit_dir.is_dir(): continue unit_id = unit_dir.name # Get all photos in this unit's directory for file_path in unit_dir.iterdir(): if file_path.is_file() and file_path.suffix.lower() in image_extensions: all_photos.append({ "unit_id": unit_id, "filename": file_path.name, "path": f"/api/unit/{unit_id}/photo/{file_path.name}", "modified": file_path.stat().st_mtime, "modified_iso": datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() }) # Sort by modification time (most recent first) and limit all_photos.sort(key=lambda x: x["modified"], reverse=True) recent_photos = all_photos[:limit] return { "photos": recent_photos, "total": len(all_photos) } @router.get("/unit/{unit_id}/photo/{filename}") def get_photo(unit_id: str, filename: str): """ Serves a specific photo file. """ file_path = PHOTOS_BASE_DIR / unit_id / filename if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail="Photo not found") return FileResponse(file_path)