Add Seismo Fleet Manager backend v0.1

Implemented FastAPI backend with SQLite database for tracking seismograph fleet status:
- database.py: SQLAlchemy setup with SQLite
- models.py: Emitter model with id, unit_type, last_seen, last_file, status, notes
- routes.py: POST /emitters/report and GET /fleet/status endpoints
- main.py: FastAPI app initialization with CORS support
- requirements.txt: Dependencies (FastAPI, SQLAlchemy, uvicorn)
This commit is contained in:
Claude
2025-11-20 18:14:29 +00:00
parent c1bdf17454
commit f976e4e893
5 changed files with 168 additions and 0 deletions

82
routes.py Normal file
View File

@@ -0,0 +1,82 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
from database import get_db
from models import Emitter
router = APIRouter()
# Pydantic schemas for request/response validation
class EmitterReport(BaseModel):
unit: str
unit_type: str
timestamp: str
file: str
status: str
class EmitterResponse(BaseModel):
id: str
unit_type: str
last_seen: datetime
last_file: str
status: str
notes: Optional[str] = None
class Config:
from_attributes = True
@router.post("/emitters/report", status_code=200)
def report_emitter(report: EmitterReport, db: Session = Depends(get_db)):
"""
Endpoint for emitters to report their status.
Creates a new emitter if it doesn't exist, or updates an existing one.
"""
try:
# Parse the timestamp
timestamp = datetime.fromisoformat(report.timestamp.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid timestamp format")
# Check if emitter already exists
emitter = db.query(Emitter).filter(Emitter.id == report.unit).first()
if emitter:
# Update existing emitter
emitter.unit_type = report.unit_type
emitter.last_seen = timestamp
emitter.last_file = report.file
emitter.status = report.status
else:
# Create new emitter
emitter = Emitter(
id=report.unit,
unit_type=report.unit_type,
last_seen=timestamp,
last_file=report.file,
status=report.status
)
db.add(emitter)
db.commit()
db.refresh(emitter)
return {
"message": "Emitter report received",
"unit": emitter.id,
"status": emitter.status
}
@router.get("/fleet/status", response_model=List[EmitterResponse])
def get_fleet_status(db: Session = Depends(get_db)):
"""
Returns a list of all emitters and their current status.
"""
emitters = db.query(Emitter).all()
return emitters