Files
terra-view/backend/routes.py

355 lines
11 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
from backend.database import get_db
from backend.models import Emitter, WatcherAgent
router = APIRouter()
# Helper function to detect unit type from unit ID
def detect_unit_type(unit_id: str) -> str:
"""
Automatically detect if a unit is Series 3 or Series 4 based on ID pattern.
Series 4 (Micromate) units have IDs starting with "UM" followed by digits (e.g., UM11719)
Series 3 units typically have other patterns
Returns:
"series4" if the unit ID matches Micromate pattern (UM#####)
"series3" otherwise
"""
if not unit_id:
return "unknown"
# Series 4 (Micromate) pattern: UM followed by digits
if unit_id.upper().startswith("UM") and len(unit_id) > 2:
# Check if remaining characters after "UM" are digits
rest = unit_id[2:]
if rest.isdigit():
return "series4"
# Default to series3 for other patterns
return "series3"
# Pydantic schemas for request/response validation
class EmitterReport(BaseModel):
unit: str
unit_type: str
timestamp: str
file: str
status: str
class EmitterResponse(BaseModel):
id: str
unit_type: str
last_seen: datetime
last_file: str
status: str
notes: Optional[str] = None
class Config:
from_attributes = True
@router.post("/emitters/report", status_code=200)
def report_emitter(report: EmitterReport, db: Session = Depends(get_db)):
"""
Endpoint for emitters to report their status.
Creates a new emitter if it doesn't exist, or updates an existing one.
"""
try:
# Parse the timestamp
timestamp = datetime.fromisoformat(report.timestamp.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid timestamp format")
# Check if emitter already exists
emitter = db.query(Emitter).filter(Emitter.id == report.unit).first()
if emitter:
# Update existing emitter
emitter.unit_type = report.unit_type
emitter.last_seen = timestamp
emitter.last_file = report.file
emitter.status = report.status
else:
# Create new emitter
emitter = Emitter(
id=report.unit,
unit_type=report.unit_type,
last_seen=timestamp,
last_file=report.file,
status=report.status
)
db.add(emitter)
db.commit()
db.refresh(emitter)
return {
"message": "Emitter report received",
"unit": emitter.id,
"status": emitter.status
}
@router.get("/fleet/status", response_model=List[EmitterResponse])
def get_fleet_status(db: Session = Depends(get_db)):
"""
Returns a list of all emitters and their current status.
"""
emitters = db.query(Emitter).all()
return emitters
# ── Watcher agent upsert helper ───────────────────────────────────────────────
def _upsert_watcher_agent(db: Session, source_id: str, source_type: str,
version: str, ip_address: str, log_tail: str,
status: str) -> None:
"""Create or update the WatcherAgent row for a given source_id."""
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source_id).first()
if agent:
agent.source_type = source_type
agent.version = version
agent.last_seen = datetime.utcnow()
agent.status = status
if ip_address:
agent.ip_address = ip_address
if log_tail is not None:
agent.log_tail = log_tail
else:
agent = WatcherAgent(
id=source_id,
source_type=source_type,
version=version,
last_seen=datetime.utcnow(),
status=status,
ip_address=ip_address,
log_tail=log_tail,
)
db.add(agent)
# series3v1.1 Standardized Heartbeat Schema (multi-unit)
from fastapi import Request
@router.post("/api/series3/heartbeat", status_code=200)
async def series3_heartbeat(request: Request, db: Session = Depends(get_db)):
"""
Accepts a full telemetry payload from the Series3 emitter.
Updates or inserts each unit into the database.
"""
payload = await request.json()
source = payload.get("source_id")
units = payload.get("units", [])
version = payload.get("version")
log_tail = payload.get("log_tail") # list of strings or None
import json as _json
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
client_ip = request.client.host if request.client else None
print("\n=== Series 3 Heartbeat ===")
print("Source:", source)
print("Units received:", len(units))
print("==========================\n")
results = []
for u in units:
uid = u.get("unit_id")
last_event_time = u.get("last_event_time")
event_meta = u.get("event_metadata", {})
age_minutes = u.get("age_minutes")
try:
if last_event_time:
ts = datetime.fromisoformat(last_event_time.replace("Z", "+00:00"))
else:
ts = None
except:
ts = None
# Pull from DB
emitter = db.query(Emitter).filter(Emitter.id == uid).first()
# File name (from event_metadata)
last_file = event_meta.get("file_name")
status = "Unknown"
# Determine status based on age
if age_minutes is None:
status = "Missing"
elif age_minutes > 24 * 60:
status = "Missing"
elif age_minutes > 12 * 60:
status = "Pending"
else:
status = "OK"
if emitter:
# Update existing
emitter.last_seen = ts
emitter.last_file = last_file
emitter.status = status
# Update unit_type if it was incorrectly classified
detected_type = detect_unit_type(uid)
if emitter.unit_type != detected_type:
emitter.unit_type = detected_type
else:
# Insert new - auto-detect unit type from ID
detected_type = detect_unit_type(uid)
emitter = Emitter(
id=uid,
unit_type=detected_type,
last_seen=ts,
last_file=last_file,
status=status
)
db.add(emitter)
results.append({"unit": uid, "status": status})
if source:
_upsert_watcher_agent(db, source, "series3_watcher", version,
client_ip, log_tail_str, "ok")
db.commit()
# Check if an update has been triggered for this agent
update_available = False
if source:
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
if agent and agent.update_pending:
update_available = True
agent.update_pending = False
db.commit()
return {
"message": "Heartbeat processed",
"source": source,
"units_processed": len(results),
"results": results,
"update_available": update_available,
}
# series4 (Micromate) Standardized Heartbeat Schema
@router.post("/api/series4/heartbeat", status_code=200)
async def series4_heartbeat(request: Request, db: Session = Depends(get_db)):
"""
Accepts a full telemetry payload from the Series4 (Micromate) emitter.
Updates or inserts each unit into the database.
Expected payload:
{
"source": "series4_emitter",
"generated_at": "2025-12-04T20:01:00",
"units": [
{
"unit_id": "UM11719",
"type": "micromate",
"project_hint": "Clearwater - ECMS 57940",
"last_call": "2025-12-04T19:30:42",
"status": "OK",
"age_days": 0.04,
"age_hours": 0.9,
"mlg_path": "C:\\THORDATA\\..."
}
]
}
"""
payload = await request.json()
# Accept source_id (new standard field) with fallback to legacy "source" key
source = payload.get("source_id") or payload.get("source", "series4_emitter")
units = payload.get("units", [])
version = payload.get("version")
log_tail = payload.get("log_tail")
import json as _json
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
client_ip = request.client.host if request.client else None
print("\n=== Series 4 Heartbeat ===")
print("Source:", source)
print("Units received:", len(units))
print("==========================\n")
results = []
for u in units:
uid = u.get("unit_id")
last_call_str = u.get("last_call")
status = u.get("status", "Unknown")
mlg_path = u.get("mlg_path")
project_hint = u.get("project_hint")
# Parse last_call timestamp
try:
if last_call_str:
ts = datetime.fromisoformat(last_call_str.replace("Z", "+00:00"))
else:
ts = None
except:
ts = None
# Pull from DB
emitter = db.query(Emitter).filter(Emitter.id == uid).first()
if emitter:
# Update existing
emitter.last_seen = ts
emitter.last_file = mlg_path
emitter.status = status
# Update unit_type if it was incorrectly classified
detected_type = detect_unit_type(uid)
if emitter.unit_type != detected_type:
emitter.unit_type = detected_type
# Optionally update notes with project hint if it exists
if project_hint and not emitter.notes:
emitter.notes = f"Project: {project_hint}"
else:
# Insert new - auto-detect unit type from ID
detected_type = detect_unit_type(uid)
notes = f"Project: {project_hint}" if project_hint else None
emitter = Emitter(
id=uid,
unit_type=detected_type,
last_seen=ts,
last_file=mlg_path,
status=status,
notes=notes
)
db.add(emitter)
results.append({"unit": uid, "status": status})
if source:
_upsert_watcher_agent(db, source, "series4_watcher", version,
client_ip, log_tail_str, "ok")
db.commit()
# Check if an update has been triggered for this agent
update_available = False
if source:
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
if agent and agent.update_pending:
update_available = True
agent.update_pending = False
db.commit()
return {
"message": "Heartbeat processed",
"source": source,
"units_processed": len(results),
"results": results,
"update_available": update_available,
}