Merge dev into reservation branch to pick up v0.8.0 watcher manager changes

This commit is contained in:
2026-03-18 20:13:19 +00:00
10 changed files with 612 additions and 7 deletions

View File

@@ -30,7 +30,7 @@ Base.metadata.create_all(bind=engine)
ENVIRONMENT = os.getenv("ENVIRONMENT", "production")
# Initialize FastAPI app
VERSION = "0.7.1"
VERSION = "0.8.0"
if ENVIRONMENT == "development":
_build = os.getenv("BUILD_NUMBER", "0")
if _build and _build != "0":
@@ -102,6 +102,9 @@ app.include_router(modem_dashboard.router)
from backend.routers import settings
app.include_router(settings.router)
from backend.routers import watcher_manager
app.include_router(watcher_manager.router)
# Projects system routers
app.include_router(projects.router)
app.include_router(project_locations.router)

View File

@@ -66,6 +66,26 @@ class RosterUnit(Base):
slm_last_check = Column(DateTime, nullable=True) # Last communication check
class WatcherAgent(Base):
"""
Watcher agents: tracks the watcher processes (series3-watcher, thor-watcher)
that run on field machines and report unit heartbeats.
Updated on every heartbeat received from each source_id.
"""
__tablename__ = "watcher_agents"
id = Column(String, primary_key=True, index=True) # source_id (hostname)
source_type = Column(String, nullable=False) # series3_watcher | series4_watcher
version = Column(String, nullable=True) # e.g. "1.4.0"
last_seen = Column(DateTime, default=datetime.utcnow)
status = Column(String, nullable=False, default="unknown") # ok | pending | missing | error | unknown
ip_address = Column(String, nullable=True)
log_tail = Column(Text, nullable=True) # last N log lines (JSON array of strings)
update_pending = Column(Boolean, default=False) # set True to trigger remote update
update_version = Column(String, nullable=True) # target version to update to
class IgnoredUnit(Base):
"""
Ignored units: units that report but should be filtered out from unknown emitters.

View File

@@ -0,0 +1,133 @@
"""
Watcher Manager — admin API for series3-watcher and thor-watcher agents.
Endpoints:
GET /api/admin/watchers — list all watcher agents
GET /api/admin/watchers/{agent_id} — get single agent detail
POST /api/admin/watchers/{agent_id}/trigger-update — flag agent for update
POST /api/admin/watchers/{agent_id}/clear-update — clear update flag
GET /api/admin/watchers/{agent_id}/update-check — polled by watcher on heartbeat
Page:
GET /admin/watchers — HTML admin page
"""
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from typing import Optional
from backend.database import get_db
from backend.models import WatcherAgent
from backend.templates_config import templates
router = APIRouter(tags=["admin"])
# ── helpers ──────────────────────────────────────────────────────────────────
def _agent_to_dict(agent: WatcherAgent) -> dict:
last_seen = agent.last_seen
if last_seen:
now_utc = datetime.utcnow()
age_minutes = int((now_utc - last_seen).total_seconds() // 60)
if age_minutes > 60:
status = "missing"
else:
status = "ok"
else:
age_minutes = None
status = "missing"
return {
"id": agent.id,
"source_type": agent.source_type,
"version": agent.version,
"last_seen": last_seen.isoformat() if last_seen else None,
"age_minutes": age_minutes,
"status": status,
"ip_address": agent.ip_address,
"log_tail": agent.log_tail,
"update_pending": bool(agent.update_pending),
"update_version": agent.update_version,
}
# ── API routes ────────────────────────────────────────────────────────────────
@router.get("/api/admin/watchers")
def list_watchers(db: Session = Depends(get_db)):
agents = db.query(WatcherAgent).order_by(WatcherAgent.last_seen.desc()).all()
return [_agent_to_dict(a) for a in agents]
@router.get("/api/admin/watchers/{agent_id}")
def get_watcher(agent_id: str, db: Session = Depends(get_db)):
agent = db.query(WatcherAgent).filter(WatcherAgent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Watcher agent not found")
return _agent_to_dict(agent)
class TriggerUpdateRequest(BaseModel):
version: Optional[str] = None # target version label (informational)
@router.post("/api/admin/watchers/{agent_id}/trigger-update")
def trigger_update(agent_id: str, body: TriggerUpdateRequest, db: Session = Depends(get_db)):
agent = db.query(WatcherAgent).filter(WatcherAgent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Watcher agent not found")
agent.update_pending = True
agent.update_version = body.version
db.commit()
return {"ok": True, "agent_id": agent_id, "update_pending": True}
@router.post("/api/admin/watchers/{agent_id}/clear-update")
def clear_update(agent_id: str, db: Session = Depends(get_db)):
agent = db.query(WatcherAgent).filter(WatcherAgent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Watcher agent not found")
agent.update_pending = False
agent.update_version = None
db.commit()
return {"ok": True, "agent_id": agent_id, "update_pending": False}
@router.get("/api/admin/watchers/{agent_id}/update-check")
def update_check(agent_id: str, db: Session = Depends(get_db)):
"""
Polled by watcher agents on each heartbeat cycle.
Returns update_available=True when an update has been triggered via the UI.
Automatically clears the flag after the watcher acknowledges it.
"""
agent = db.query(WatcherAgent).filter(WatcherAgent.id == agent_id).first()
if not agent:
return {"update_available": False}
pending = bool(agent.update_pending)
if pending:
# Clear the flag — the watcher will now self-update
agent.update_pending = False
db.commit()
return {
"update_available": pending,
"version": agent.update_version,
}
# ── HTML page ─────────────────────────────────────────────────────────────────
@router.get("/admin/watchers", response_class=HTMLResponse)
def admin_watchers_page(request: Request, db: Session = Depends(get_db)):
agents = db.query(WatcherAgent).order_by(WatcherAgent.last_seen.desc()).all()
agents_data = [_agent_to_dict(a) for a in agents]
return templates.TemplateResponse("admin_watchers.html", {
"request": request,
"agents": agents_data,
})

View File

@@ -5,7 +5,7 @@ from datetime import datetime
from typing import Optional, List
from backend.database import get_db
from backend.models import Emitter
from backend.models import Emitter, WatcherAgent
router = APIRouter()
@@ -107,6 +107,35 @@ def get_fleet_status(db: Session = Depends(get_db)):
emitters = db.query(Emitter).all()
return emitters
# ── Watcher agent upsert helper ───────────────────────────────────────────────
def _upsert_watcher_agent(db: Session, source_id: str, source_type: str,
version: str, ip_address: str, log_tail: str,
status: str) -> None:
"""Create or update the WatcherAgent row for a given source_id."""
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source_id).first()
if agent:
agent.source_type = source_type
agent.version = version
agent.last_seen = datetime.utcnow()
agent.status = status
if ip_address:
agent.ip_address = ip_address
if log_tail is not None:
agent.log_tail = log_tail
else:
agent = WatcherAgent(
id=source_id,
source_type=source_type,
version=version,
last_seen=datetime.utcnow(),
status=status,
ip_address=ip_address,
log_tail=log_tail,
)
db.add(agent)
# series3v1.1 Standardized Heartbeat Schema (multi-unit)
from fastapi import Request
@@ -120,6 +149,11 @@ async def series3_heartbeat(request: Request, db: Session = Depends(get_db)):
source = payload.get("source_id")
units = payload.get("units", [])
version = payload.get("version")
log_tail = payload.get("log_tail") # list of strings or None
import json as _json
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
client_ip = request.client.host if request.client else None
print("\n=== Series 3 Heartbeat ===")
print("Source:", source)
@@ -182,13 +216,27 @@ async def series3_heartbeat(request: Request, db: Session = Depends(get_db)):
results.append({"unit": uid, "status": status})
if source:
_upsert_watcher_agent(db, source, "series3_watcher", version,
client_ip, log_tail_str, "ok")
db.commit()
# Check if an update has been triggered for this agent
update_available = False
if source:
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
if agent and agent.update_pending:
update_available = True
agent.update_pending = False
db.commit()
return {
"message": "Heartbeat processed",
"source": source,
"units_processed": len(results),
"results": results
"results": results,
"update_available": update_available,
}
@@ -221,6 +269,11 @@ async def series4_heartbeat(request: Request, db: Session = Depends(get_db)):
source = payload.get("source", "series4_emitter")
units = payload.get("units", [])
version = payload.get("version")
log_tail = payload.get("log_tail")
import json as _json
log_tail_str = _json.dumps(log_tail) if log_tail is not None else None
client_ip = request.client.host if request.client else None
print("\n=== Series 4 Heartbeat ===")
print("Source:", source)
@@ -276,11 +329,25 @@ async def series4_heartbeat(request: Request, db: Session = Depends(get_db)):
results.append({"unit": uid, "status": status})
if source:
_upsert_watcher_agent(db, source, "series4_watcher", version,
client_ip, log_tail_str, "ok")
db.commit()
# Check if an update has been triggered for this agent
update_available = False
if source:
agent = db.query(WatcherAgent).filter(WatcherAgent.id == source).first()
if agent and agent.update_pending:
update_available = True
agent.update_pending = False
db.commit()
return {
"message": "Heartbeat processed",
"source": source,
"units_processed": len(results),
"results": results
"results": results,
"update_available": update_available,
}

View File

@@ -60,6 +60,19 @@ def jinja_same_date(dt1, dt2) -> bool:
return False
def jinja_log_tail_display(s):
"""Jinja filter: decode a JSON-encoded log tail array into a plain-text string."""
if not s:
return ""
try:
lines = _json.loads(s)
if isinstance(lines, list):
return "\n".join(str(l) for l in lines)
return str(s)
except Exception:
return str(s)
# Register Jinja filters and globals
templates.env.filters["local_datetime"] = jinja_local_datetime
templates.env.filters["local_time"] = jinja_local_time
@@ -68,3 +81,4 @@ templates.env.filters["fromjson"] = jinja_fromjson
templates.env.globals["timezone_abbr"] = jinja_timezone_abbr
templates.env.globals["get_user_timezone"] = get_user_timezone
templates.env.globals["same_date"] = jinja_same_date
templates.env.filters["log_tail_display"] = jinja_log_tail_display