feat: watcher agent management system implemented.
This commit is contained in:
@@ -5,7 +5,7 @@ from datetime import datetime
|
||||
from typing import Optional, List
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.models import Emitter
|
||||
from backend.models import Emitter, WatcherAgent
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -107,6 +107,35 @@ def get_fleet_status(db: Session = Depends(get_db)):
|
||||
emitters = db.query(Emitter).all()
|
||||
return emitters
|
||||
|
||||
# ── Watcher agent upsert helper ───────────────────────────────────────────────
|
||||
|
||||
def _upsert_watcher_agent(db: Session, source_id: str, source_type: str,
|
||||
version: str, ip_address: str, log_tail: str,
|
||||
status: str) -> None:
|
||||
"""Create or update the WatcherAgent row for a given source_id."""
|
||||
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source_id).first()
|
||||
if agent:
|
||||
agent.source_type = source_type
|
||||
agent.version = version
|
||||
agent.last_seen = datetime.utcnow()
|
||||
agent.status = status
|
||||
if ip_address:
|
||||
agent.ip_address = ip_address
|
||||
if log_tail is not None:
|
||||
agent.log_tail = log_tail
|
||||
else:
|
||||
agent = WatcherAgent(
|
||||
id=source_id,
|
||||
source_type=source_type,
|
||||
version=version,
|
||||
last_seen=datetime.utcnow(),
|
||||
status=status,
|
||||
ip_address=ip_address,
|
||||
log_tail=log_tail,
|
||||
)
|
||||
db.add(agent)
|
||||
|
||||
|
||||
# series3v1.1 Standardized Heartbeat Schema (multi-unit)
|
||||
from fastapi import Request
|
||||
|
||||
@@ -120,6 +149,11 @@ async def series3_heartbeat(request: Request, db: Session = Depends(get_db)):
|
||||
|
||||
source = payload.get("source_id")
|
||||
units = payload.get("units", [])
|
||||
version = payload.get("version")
|
||||
log_tail = payload.get("log_tail") # list of strings or None
|
||||
import json as _json
|
||||
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
|
||||
client_ip = request.client.host if request.client else None
|
||||
|
||||
print("\n=== Series 3 Heartbeat ===")
|
||||
print("Source:", source)
|
||||
@@ -182,13 +216,38 @@ async def series3_heartbeat(request: Request, db: Session = Depends(get_db)):
|
||||
|
||||
results.append({"unit": uid, "status": status})
|
||||
|
||||
# Determine overall worst status for the watcher agent row
|
||||
statuses = [r["status"] for r in results]
|
||||
if "Missing" in statuses:
|
||||
agent_status = "missing"
|
||||
elif "Pending" in statuses:
|
||||
agent_status = "pending"
|
||||
elif statuses:
|
||||
agent_status = "ok"
|
||||
else:
|
||||
agent_status = "unknown"
|
||||
|
||||
if source:
|
||||
_upsert_watcher_agent(db, source, "series3_watcher", version,
|
||||
client_ip, log_tail_str, agent_status)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Check if an update has been triggered for this agent
|
||||
update_available = False
|
||||
if source:
|
||||
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
|
||||
if agent and agent.update_pending:
|
||||
update_available = True
|
||||
agent.update_pending = False
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"message": "Heartbeat processed",
|
||||
"source": source,
|
||||
"units_processed": len(results),
|
||||
"results": results
|
||||
"results": results,
|
||||
"update_available": update_available,
|
||||
}
|
||||
|
||||
|
||||
@@ -221,6 +280,11 @@ async def series4_heartbeat(request: Request, db: Session = Depends(get_db)):
|
||||
|
||||
source = payload.get("source", "series4_emitter")
|
||||
units = payload.get("units", [])
|
||||
version = payload.get("version")
|
||||
log_tail = payload.get("log_tail")
|
||||
import json as _json
|
||||
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
|
||||
client_ip = request.client.host if request.client else None
|
||||
|
||||
print("\n=== Series 4 Heartbeat ===")
|
||||
print("Source:", source)
|
||||
@@ -276,11 +340,36 @@ async def series4_heartbeat(request: Request, db: Session = Depends(get_db)):
|
||||
|
||||
results.append({"unit": uid, "status": status})
|
||||
|
||||
# Determine overall worst status for the watcher agent row
|
||||
statuses = [r["status"] for r in results]
|
||||
if any(s.lower() == "stale" for s in statuses):
|
||||
agent_status = "missing"
|
||||
elif any(s.lower() == "late" for s in statuses):
|
||||
agent_status = "pending"
|
||||
elif statuses:
|
||||
agent_status = "ok"
|
||||
else:
|
||||
agent_status = "unknown"
|
||||
|
||||
if source:
|
||||
_upsert_watcher_agent(db, source, "series4_watcher", version,
|
||||
client_ip, log_tail_str, agent_status)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Check if an update has been triggered for this agent
|
||||
update_available = False
|
||||
if source:
|
||||
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
|
||||
if agent and agent.update_pending:
|
||||
update_available = True
|
||||
agent.update_pending = False
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"message": "Heartbeat processed",
|
||||
"source": source,
|
||||
"units_processed": len(results),
|
||||
"results": results
|
||||
"results": results,
|
||||
"update_available": update_available,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user