import os import logging from fastapi import FastAPI, Request, Depends, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.exceptions import RequestValidationError from sqlalchemy.orm import Session from typing import List, Dict, Optional from pydantic import BaseModel # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) from backend.database import engine, Base, get_db from backend.routers import roster, units, photos, roster_edit, roster_rename, dashboard, dashboard_tabs, activity, slmm, slm_ui, slm_dashboard, seismo_dashboard, projects, project_locations, scheduler from backend.services.snapshot import emit_status_snapshot from backend.models import IgnoredUnit # Create database tables Base.metadata.create_all(bind=engine) # Read environment (development or production) ENVIRONMENT = os.getenv("ENVIRONMENT", "production") # Initialize FastAPI app VERSION = "0.4.3" app = FastAPI( title="Seismo Fleet Manager", description="Backend API for managing seismograph fleet status", version=VERSION ) # Add validation error handler to log details @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): logger.error(f"Validation error on {request.url}: {exc.errors()}") logger.error(f"Body: {await request.body()}") return JSONResponse( status_code=400, content={"detail": exc.errors()} ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files app.mount("/static", StaticFiles(directory="backend/static"), name="static") # Setup Jinja2 templates templates = Jinja2Templates(directory="templates") # Add custom context processor to inject environment variable into all templates @app.middleware("http") async def add_environment_to_context(request: Request, call_next): """Middleware to add environment variable to request state""" request.state.environment = ENVIRONMENT response = await call_next(request) return response # Override TemplateResponse to include environment and version in context original_template_response = templates.TemplateResponse def custom_template_response(name, context=None, *args, **kwargs): if context is None: context = {} context["environment"] = ENVIRONMENT context["version"] = VERSION return original_template_response(name, context, *args, **kwargs) templates.TemplateResponse = custom_template_response # Include API routers app.include_router(roster.router) app.include_router(units.router) app.include_router(photos.router) app.include_router(roster_edit.router) app.include_router(roster_rename.router) app.include_router(dashboard.router) app.include_router(dashboard_tabs.router) app.include_router(activity.router) app.include_router(slmm.router) app.include_router(slm_ui.router) app.include_router(slm_dashboard.router) app.include_router(seismo_dashboard.router) from backend.routers import settings app.include_router(settings.router) # Projects system routers app.include_router(projects.router) app.include_router(project_locations.router) app.include_router(scheduler.router) # Start scheduler service on application startup from backend.services.scheduler import start_scheduler, stop_scheduler @app.on_event("startup") async def startup_event(): """Initialize services on app startup""" logger.info("Starting scheduler service...") await start_scheduler() logger.info("Scheduler service started") # Sync all SLMs to SLMM on startup logger.info("Syncing SLM devices to SLMM...") try: from backend.services.slmm_sync import sync_all_slms_to_slmm, cleanup_orphaned_slmm_devices from backend.database import SessionLocal db = SessionLocal() try: # Sync all SLMs from roster to SLMM sync_results = await sync_all_slms_to_slmm(db) logger.info(f"SLM sync complete: {sync_results}") # Clean up orphaned devices in SLMM cleanup_results = await cleanup_orphaned_slmm_devices(db) logger.info(f"SLMM cleanup complete: {cleanup_results}") finally: db.close() except Exception as e: logger.error(f"Error syncing SLMs to SLMM on startup: {e}") @app.on_event("shutdown") def shutdown_event(): """Clean up services on app shutdown""" logger.info("Stopping scheduler service...") stop_scheduler() logger.info("Scheduler service stopped") # Legacy routes from the original backend from backend import routes as legacy_routes app.include_router(legacy_routes.router) # HTML page routes @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request): """Dashboard home page""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/roster", response_class=HTMLResponse) async def roster_page(request: Request): """Fleet roster page""" return templates.TemplateResponse("roster.html", {"request": request}) @app.get("/unit/{unit_id}", response_class=HTMLResponse) async def unit_detail_page(request: Request, unit_id: str): """Unit detail page""" return templates.TemplateResponse("unit_detail.html", { "request": request, "unit_id": unit_id }) @app.get("/settings", response_class=HTMLResponse) async def settings_page(request: Request): """Settings page for roster management""" return templates.TemplateResponse("settings.html", {"request": request}) @app.get("/sound-level-meters", response_class=HTMLResponse) async def sound_level_meters_page(request: Request): """Sound Level Meters management dashboard""" return templates.TemplateResponse("sound_level_meters.html", {"request": request}) @app.get("/slm/{unit_id}", response_class=HTMLResponse) async def slm_legacy_dashboard( request: Request, unit_id: str, from_project: Optional[str] = None, from_nrl: Optional[str] = None, db: Session = Depends(get_db) ): """Legacy SLM control center dashboard for a specific unit""" # Get project details if from_project is provided project = None if from_project: from backend.models import Project project = db.query(Project).filter_by(id=from_project).first() # Get NRL location details if from_nrl is provided nrl_location = None if from_nrl: from backend.models import NRLLocation nrl_location = db.query(NRLLocation).filter_by(id=from_nrl).first() return templates.TemplateResponse("slm_legacy_dashboard.html", { "request": request, "unit_id": unit_id, "from_project": from_project, "from_nrl": from_nrl, "project": project, "nrl_location": nrl_location }) @app.get("/seismographs", response_class=HTMLResponse) async def seismographs_page(request: Request): """Seismographs management dashboard""" return templates.TemplateResponse("seismographs.html", {"request": request}) @app.get("/projects", response_class=HTMLResponse) async def projects_page(request: Request): """Projects management and overview""" return templates.TemplateResponse("projects/overview.html", {"request": request}) @app.get("/projects/{project_id}", response_class=HTMLResponse) async def project_detail_page(request: Request, project_id: str): """Project detail dashboard""" return templates.TemplateResponse("projects/detail.html", { "request": request, "project_id": project_id }) @app.get("/projects/{project_id}/nrl/{location_id}", response_class=HTMLResponse) async def nrl_detail_page( request: Request, project_id: str, location_id: str, db: Session = Depends(get_db) ): """NRL (Noise Recording Location) detail page with tabs""" from backend.models import Project, MonitoringLocation, UnitAssignment, RosterUnit, RecordingSession, DataFile from sqlalchemy import and_ # Get project project = db.query(Project).filter_by(id=project_id).first() if not project: return templates.TemplateResponse("404.html", { "request": request, "message": "Project not found" }, status_code=404) # Get location location = db.query(MonitoringLocation).filter_by( id=location_id, project_id=project_id ).first() if not location: return templates.TemplateResponse("404.html", { "request": request, "message": "Location not found" }, status_code=404) # Get active assignment assignment = db.query(UnitAssignment).filter( and_( UnitAssignment.location_id == location_id, UnitAssignment.status == "active" ) ).first() assigned_unit = None if assignment: assigned_unit = db.query(RosterUnit).filter_by(id=assignment.unit_id).first() # Get session count session_count = db.query(RecordingSession).filter_by(location_id=location_id).count() # Get file count (DataFile links to session, not directly to location) file_count = db.query(DataFile).join( RecordingSession, DataFile.session_id == RecordingSession.id ).filter(RecordingSession.location_id == location_id).count() # Check for active session active_session = db.query(RecordingSession).filter( and_( RecordingSession.location_id == location_id, RecordingSession.status == "recording" ) ).first() return templates.TemplateResponse("nrl_detail.html", { "request": request, "project_id": project_id, "location_id": location_id, "project": project, "location": location, "assignment": assignment, "assigned_unit": assigned_unit, "session_count": session_count, "file_count": file_count, "active_session": active_session, }) # ===== PWA ROUTES ===== @app.get("/sw.js") async def service_worker(): """Serve service worker with proper headers for PWA""" return FileResponse( "backend/static/sw.js", media_type="application/javascript", headers={ "Service-Worker-Allowed": "/", "Cache-Control": "no-cache" } ) @app.get("/offline-db.js") async def offline_db_script(): """Serve offline database script""" return FileResponse( "backend/static/offline-db.js", media_type="application/javascript", headers={"Cache-Control": "no-cache"} ) # Pydantic model for sync edits class EditItem(BaseModel): id: int unitId: str changes: Dict timestamp: int class SyncEditsRequest(BaseModel): edits: List[EditItem] @app.post("/api/sync-edits") async def sync_edits(request: SyncEditsRequest, db: Session = Depends(get_db)): """Process offline edit queue and sync to database""" from backend.models import RosterUnit results = [] synced_ids = [] for edit in request.edits: try: # Find the unit unit = db.query(RosterUnit).filter_by(id=edit.unitId).first() if not unit: results.append({ "id": edit.id, "status": "error", "reason": f"Unit {edit.unitId} not found" }) continue # Apply changes for key, value in edit.changes.items(): if hasattr(unit, key): # Handle boolean conversions if key in ['deployed', 'retired']: setattr(unit, key, value in ['true', True, 'True', '1', 1]) else: setattr(unit, key, value if value != '' else None) db.commit() results.append({ "id": edit.id, "status": "success" }) synced_ids.append(edit.id) except Exception as e: db.rollback() results.append({ "id": edit.id, "status": "error", "reason": str(e) }) synced_count = len(synced_ids) return JSONResponse({ "synced": synced_count, "total": len(request.edits), "synced_ids": synced_ids, "results": results }) @app.get("/partials/roster-deployed", response_class=HTMLResponse) async def roster_deployed_partial(request: Request): """Partial template for deployed units tab""" from datetime import datetime snapshot = emit_status_snapshot() units_list = [] for unit_id, unit_data in snapshot["active"].items(): units_list.append({ "id": unit_id, "status": unit_data.get("status", "Unknown"), "age": unit_data.get("age", "N/A"), "last_seen": unit_data.get("last", "Never"), "deployed": unit_data.get("deployed", False), "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "address": unit_data.get("address", ""), "coordinates": unit_data.get("coordinates", ""), "project_id": unit_data.get("project_id", ""), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Sort by status priority (Missing > Pending > OK) then by ID status_priority = {"Missing": 0, "Pending": 1, "OK": 2} units_list.sort(key=lambda x: (status_priority.get(x["status"], 3), x["id"])) return templates.TemplateResponse("partials/roster_table.html", { "request": request, "units": units_list, "timestamp": datetime.now().strftime("%H:%M:%S") }) @app.get("/partials/roster-benched", response_class=HTMLResponse) async def roster_benched_partial(request: Request): """Partial template for benched units tab""" from datetime import datetime snapshot = emit_status_snapshot() units_list = [] for unit_id, unit_data in snapshot["benched"].items(): units_list.append({ "id": unit_id, "status": unit_data.get("status", "N/A"), "age": unit_data.get("age", "N/A"), "last_seen": unit_data.get("last", "Never"), "deployed": unit_data.get("deployed", False), "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "address": unit_data.get("address", ""), "coordinates": unit_data.get("coordinates", ""), "project_id": unit_data.get("project_id", ""), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Sort by ID units_list.sort(key=lambda x: x["id"]) return templates.TemplateResponse("partials/roster_table.html", { "request": request, "units": units_list, "timestamp": datetime.now().strftime("%H:%M:%S") }) @app.get("/partials/roster-retired", response_class=HTMLResponse) async def roster_retired_partial(request: Request): """Partial template for retired units tab""" from datetime import datetime snapshot = emit_status_snapshot() units_list = [] for unit_id, unit_data in snapshot["retired"].items(): units_list.append({ "id": unit_id, "status": unit_data["status"], "age": unit_data["age"], "last_seen": unit_data["last"], "deployed": unit_data["deployed"], "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Sort by ID units_list.sort(key=lambda x: x["id"]) return templates.TemplateResponse("partials/retired_table.html", { "request": request, "units": units_list, "timestamp": datetime.now().strftime("%H:%M:%S") }) @app.get("/partials/roster-ignored", response_class=HTMLResponse) async def roster_ignored_partial(request: Request, db: Session = Depends(get_db)): """Partial template for ignored units tab""" from datetime import datetime ignored = db.query(IgnoredUnit).all() ignored_list = [] for unit in ignored: ignored_list.append({ "id": unit.id, "reason": unit.reason or "", "ignored_at": unit.ignored_at.strftime("%Y-%m-%d %H:%M:%S") if unit.ignored_at else "Unknown" }) # Sort by ID ignored_list.sort(key=lambda x: x["id"]) return templates.TemplateResponse("partials/ignored_table.html", { "request": request, "ignored_units": ignored_list, "timestamp": datetime.now().strftime("%H:%M:%S") }) @app.get("/partials/unknown-emitters", response_class=HTMLResponse) async def unknown_emitters_partial(request: Request): """Partial template for unknown emitters (HTMX)""" snapshot = emit_status_snapshot() unknown_list = [] for unit_id, unit_data in snapshot.get("unknown", {}).items(): unknown_list.append({ "id": unit_id, "status": unit_data["status"], "age": unit_data["age"], "fname": unit_data.get("fname", ""), }) # Sort by ID unknown_list.sort(key=lambda x: x["id"]) return templates.TemplateResponse("partials/unknown_emitters.html", { "request": request, "unknown_units": unknown_list }) @app.get("/partials/devices-all", response_class=HTMLResponse) async def devices_all_partial(request: Request): """Unified partial template for ALL devices with comprehensive filtering support""" from datetime import datetime snapshot = emit_status_snapshot() units_list = [] # Add deployed/active units for unit_id, unit_data in snapshot["active"].items(): units_list.append({ "id": unit_id, "status": unit_data.get("status", "Unknown"), "age": unit_data.get("age", "N/A"), "last_seen": unit_data.get("last", "Never"), "deployed": True, "retired": False, "ignored": False, "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "address": unit_data.get("address", ""), "coordinates": unit_data.get("coordinates", ""), "project_id": unit_data.get("project_id", ""), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Add benched units for unit_id, unit_data in snapshot["benched"].items(): units_list.append({ "id": unit_id, "status": unit_data.get("status", "N/A"), "age": unit_data.get("age", "N/A"), "last_seen": unit_data.get("last", "Never"), "deployed": False, "retired": False, "ignored": False, "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "address": unit_data.get("address", ""), "coordinates": unit_data.get("coordinates", ""), "project_id": unit_data.get("project_id", ""), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Add retired units for unit_id, unit_data in snapshot["retired"].items(): units_list.append({ "id": unit_id, "status": "Retired", "age": "N/A", "last_seen": "N/A", "deployed": False, "retired": True, "ignored": False, "note": unit_data.get("note", ""), "device_type": unit_data.get("device_type", "seismograph"), "address": unit_data.get("address", ""), "coordinates": unit_data.get("coordinates", ""), "project_id": unit_data.get("project_id", ""), "last_calibrated": unit_data.get("last_calibrated"), "next_calibration_due": unit_data.get("next_calibration_due"), "deployed_with_modem_id": unit_data.get("deployed_with_modem_id"), "ip_address": unit_data.get("ip_address"), "phone_number": unit_data.get("phone_number"), "hardware_model": unit_data.get("hardware_model"), }) # Add ignored units for unit_id, unit_data in snapshot.get("ignored", {}).items(): units_list.append({ "id": unit_id, "status": "Ignored", "age": "N/A", "last_seen": "N/A", "deployed": False, "retired": False, "ignored": True, "note": unit_data.get("note", unit_data.get("reason", "")), "device_type": unit_data.get("device_type", "unknown"), "address": "", "coordinates": "", "project_id": "", "last_calibrated": None, "next_calibration_due": None, "deployed_with_modem_id": None, "ip_address": None, "phone_number": None, "hardware_model": None, }) # Sort by status category, then by ID def sort_key(unit): # Priority: deployed (active) -> benched -> retired -> ignored if unit["deployed"]: return (0, unit["id"]) elif not unit["retired"] and not unit["ignored"]: return (1, unit["id"]) elif unit["retired"]: return (2, unit["id"]) else: return (3, unit["id"]) units_list.sort(key=sort_key) return templates.TemplateResponse("partials/devices_table.html", { "request": request, "units": units_list, "timestamp": datetime.now().strftime("%H:%M:%S") }) @app.get("/health") def health_check(): """Health check endpoint""" return { "message": f"Seismo Fleet Manager v{VERSION}", "status": "running", "version": VERSION } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)