Files
terra-view/backend/routers/admin_modules.py
T

219 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Admin / diagnostic pages for the device modules (SFM, SLMM).
These pages live under /admin/{module} and exist purely so an operator can
peek under the hood and confirm the module is reachable, what data it's
holding, and whether the proxy from terra-view is healthy.
Routes:
GET /admin/sfm — SFM diagnostic page
GET /admin/slmm — SLMM diagnostic page
API helpers (called by the HTML pages via fetch):
GET /api/admin/sfm/overview — aggregated SFM health + db stats in one call
GET /api/admin/slmm/overview — aggregated SLMM health + device count
The pages are intentionally read-only. Any actual administration of SFM
or SLMM happens in those modules directly.
"""
import logging
import os
from datetime import datetime, timezone
from typing import Any, Dict
import httpx
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, JSONResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
log = logging.getLogger(__name__)
router = APIRouter()
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
# ── SFM ───────────────────────────────────────────────────────────────────────
@router.get("/admin/sfm", response_class=HTMLResponse)
def admin_sfm_page(request: Request):
return templates.TemplateResponse("admin_sfm.html", {
"request": request,
"sfm_base_url": SFM_BASE_URL,
})
@router.get("/admin/events", response_class=HTMLResponse)
def admin_events_page(request: Request):
"""SFM Event DB Manager — browse, flag, and delete events across all units."""
return templates.TemplateResponse("admin_events.html", {
"request": request,
"sfm_base_url": SFM_BASE_URL,
})
@router.get("/api/admin/sfm/overview")
async def admin_sfm_overview() -> JSONResponse:
"""Aggregated SFM diagnostic snapshot.
Returns health, db stats, stale-table counts, per-unit summary, and
recent events with forwarding latency. Tolerant of partial failures:
any individual sub-fetch error is captured into its section, so a flaky
sub-endpoint doesn't break the whole page.
"""
overview: Dict[str, Any] = {
"sfm_base_url": SFM_BASE_URL,
"checked_at": datetime.now(timezone.utc).isoformat(),
"health": None,
"reachable": False,
"units": [],
"events": [],
"stale": {
"monitor_log": None,
"sessions": None,
},
"cache_stats": None,
"errors": {},
}
async with httpx.AsyncClient(timeout=5.0) as client:
# Health
try:
r = await client.get(f"{SFM_BASE_URL}/health")
r.raise_for_status()
overview["health"] = r.json()
overview["reachable"] = overview["health"].get("status") == "ok"
except Exception as e: # noqa: BLE001
overview["errors"]["health"] = str(e)
overview["reachable"] = False
# If SFM is down, no point hitting the rest.
if not overview["reachable"]:
return JSONResponse(overview)
# Units
try:
r = await client.get(f"{SFM_BASE_URL}/db/units")
r.raise_for_status()
overview["units"] = r.json() or []
except Exception as e: # noqa: BLE001
overview["errors"]["units"] = str(e)
# Recent events (newest 25 — bigger sample of the call-home stream)
try:
r = await client.get(f"{SFM_BASE_URL}/db/events", params={"limit": 25})
r.raise_for_status()
payload = r.json() or {}
events = payload.get("events", []) or []
# Compute forwarding latency: created_at (SFM ingest) timestamp (event).
now = datetime.now(timezone.utc)
for ev in events:
ev.pop("waveform_blob", None)
ev.pop("a5_pickle_filename", None)
ts_str = ev.get("timestamp")
ca_str = ev.get("created_at")
latency_seconds = None
try:
if ts_str and ca_str:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
ca = datetime.fromisoformat(ca_str.replace("Z", "+00:00"))
if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc)
if ca.tzinfo is None: ca = ca.replace(tzinfo=timezone.utc)
latency_seconds = (ca - ts).total_seconds()
except ValueError:
pass
ev["forwarding_latency_seconds"] = latency_seconds
overview["events"] = events
except Exception as e: # noqa: BLE001
overview["errors"]["events"] = str(e)
# Stale tables (deprecated by the watcher-forward pipeline but still
# present in SFM's SQLite). Surface as counts only.
for key, path in (("monitor_log", "/db/monitor_log"),
("sessions", "/db/sessions")):
try:
r = await client.get(f"{SFM_BASE_URL}{path}", params={"limit": 1})
r.raise_for_status()
payload = r.json() or {}
# SFM returns count = total when limit covers all rows; we
# query with limit=1 just to be polite, then ask again with
# a high limit if we need the real total.
first_count = payload.get("count")
if first_count is None:
overview["stale"][key] = None
continue
# Re-query with high limit to get the true total.
r2 = await client.get(f"{SFM_BASE_URL}{path}", params={"limit": 100000})
r2.raise_for_status()
overview["stale"][key] = (r2.json() or {}).get("count")
except Exception as e: # noqa: BLE001
overview["errors"][f"stale_{key}"] = str(e)
# Cache stats (in-memory device cache on SFM)
try:
r = await client.get(f"{SFM_BASE_URL}/cache/stats")
r.raise_for_status()
overview["cache_stats"] = r.json()
except Exception as e: # noqa: BLE001
overview["errors"]["cache_stats"] = str(e)
# Aggregate counts the UI can render without re-walking arrays
overview["totals"] = {
"units": len(overview["units"]),
"events_total": sum(u.get("total_events", 0) for u in overview["units"]),
"stale_monitor_log": overview["stale"]["monitor_log"],
"stale_sessions": overview["stale"]["sessions"],
}
return JSONResponse(overview)
# ── SLMM ──────────────────────────────────────────────────────────────────────
@router.get("/admin/slmm", response_class=HTMLResponse)
def admin_slmm_page(request: Request):
return templates.TemplateResponse("admin_slmm.html", {
"request": request,
"slmm_base_url": SLMM_BASE_URL,
})
@router.get("/api/admin/slmm/overview")
async def admin_slmm_overview() -> JSONResponse:
"""Aggregated SLMM diagnostic snapshot."""
overview: Dict[str, Any] = {
"slmm_base_url": SLMM_BASE_URL,
"checked_at": datetime.now(timezone.utc).isoformat(),
"health": None,
"reachable": False,
"devices": [],
"errors": {},
}
async with httpx.AsyncClient(timeout=5.0) as client:
try:
r = await client.get(f"{SLMM_BASE_URL}/health")
r.raise_for_status()
overview["health"] = r.json()
overview["reachable"] = True
except Exception as e: # noqa: BLE001
overview["errors"]["health"] = str(e)
return JSONResponse(overview)
# Pull a roster of configured devices (SLMM exposes per-unit
# config + status under /api/nl43/*). This is a best-effort probe
# — SLMM doesn't expose a "list all devices" endpoint, so we ask
# terra-view's RosterUnit table what serials it knows about for
# SLMs and just check each one. For now, just surface the health
# payload and let the operator click through to /sound-level-meters
# for the per-device details.
return JSONResponse(overview)