feat(auth): superadmin user-management page + CRUD

/admin/users page and /api/admin/users/* JSON CRUD endpoints, all behind
require_role("superadmin"). Temp passwords are returned once on create/reset
and never stored in plaintext. Admins get 403; password_hash is never leaked.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 19:42:05 +00:00
parent 41ab900c33
commit bff9a4af4a
4 changed files with 297 additions and 0 deletions
+3
View File
@@ -98,6 +98,9 @@ app.middleware("http")(operator_gate)
from backend.routers import operator_auth_routes
app.include_router(operator_auth_routes.router)
from backend.routers import operator_users
app.include_router(operator_users.router)
# Override TemplateResponse to include environment and version in context
original_template_response = templates.TemplateResponse
def custom_template_response(name, context=None, *args, **kwargs):
+103
View File
@@ -0,0 +1,103 @@
"""Operator account management — superadmin only. Temp passwords are returned in
the JSON response once (shown to the superadmin to hand off); only hashes persist."""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
from backend.models import OperatorUser
from backend.operator_auth import (
require_role, create_operator, reset_operator_password,
set_operator_active, set_operator_role,
)
from backend.utils.timezone import format_local_datetime
router = APIRouter(tags=["operator-users"])
_superadmin = require_role("superadmin")
class NewUser(BaseModel):
email: str
name: str
role: str = "admin"
class RoleChange(BaseModel):
role: str
def _serialize(u: OperatorUser) -> dict:
from datetime import datetime
return {
"id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role,
"active": bool(u.active), "must_change_password": bool(u.must_change_password),
"locked": bool(u.locked_until and u.locked_until > datetime.utcnow()),
"last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None,
}
@router.get("/admin/users")
async def users_page(request: Request, _=Depends(_superadmin)):
return templates.TemplateResponse("admin/users.html", {"request": request})
@router.get("/api/admin/users")
async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)):
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
return {"users": [_serialize(u) for u in users]}
@router.post("/api/admin/users")
async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)):
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
try:
user, raw = create_operator(db, body.email, body.name, body.role)
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
return {"user": _serialize(user), "password": raw}
@router.post("/api/admin/users/{user_id}/reset-password")
async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
raw = reset_operator_password(db, user)
return {"password": raw}
@router.post("/api/admin/users/{user_id}/disable")
async def disable_user(user_id: str, acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot disable your own account"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, False)
return {"active": False}
@router.post("/api/admin/users/{user_id}/enable")
async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, True)
return {"active": True}
@router.post("/api/admin/users/{user_id}/role")
async def change_user_role(user_id: str, body: RoleChange,
acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot change your own role"})
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_role(db, user, body.role)
return {"role": user.role}
+71
View File
@@ -0,0 +1,71 @@
{% extends "base.html" %}
{% block title %}Operator Accounts{% endblock %}
{% block content %}
<div class="max-w-4xl mx-auto p-4">
<div class="flex items-center justify-between mb-4">
<h1 class="text-2xl font-semibold">Operator Accounts</h1>
<button id="add-user-btn" class="px-3 py-2 rounded bg-orange-500 hover:bg-orange-600 text-white text-sm">+ Add operator</button>
</div>
<div id="temp-pw-banner" class="hidden mb-4 px-3 py-2 rounded bg-emerald-900/60 text-emerald-100 text-sm"></div>
<table class="w-full text-sm">
<thead><tr class="text-left border-b border-slate-600">
<th class="py-2">Name</th><th>Email</th><th>Role</th><th>Status</th><th>Last login</th><th></th>
</tr></thead>
<tbody id="user-rows"></tbody>
</table>
</div>
<script>
const $ = (s) => document.querySelector(s);
function showTemp(email, pw) {
const b = $("#temp-pw-banner");
b.textContent = `Temporary password for ${email}: ${pw} — copy it now, it won't be shown again.`;
b.classList.remove("hidden");
}
async function load() {
const r = await fetch("/api/admin/users");
const { users } = await r.json();
$("#user-rows").innerHTML = users.map(u => `
<tr class="border-b border-slate-700">
<td class="py-2">${u.display_name}</td>
<td>${u.email}</td>
<td>
<select data-role="${u.id}" class="bg-slate-700 rounded px-1 py-0.5">
<option value="admin"${u.role==='admin'?' selected':''}>admin</option>
<option value="superadmin"${u.role==='superadmin'?' selected':''}>superadmin</option>
</select>
</td>
<td>${u.active ? 'active' : 'disabled'}${u.locked ? ' (locked)' : ''}</td>
<td>${u.last_login_at || '—'}</td>
<td class="text-right space-x-2">
<button data-reset="${u.id}" data-email="${u.email}" class="text-orange-400 hover:underline">Reset pw</button>
<button data-toggle="${u.id}" data-active="${u.active}" class="text-slate-300 hover:underline">${u.active ? 'Disable' : 'Enable'}</button>
</td>
</tr>`).join("");
}
document.addEventListener("click", async (e) => {
if (e.target.dataset.reset) {
const r = await fetch(`/api/admin/users/${e.target.dataset.reset}/reset-password`, {method:"POST"});
const d = await r.json(); showTemp(e.target.dataset.email, d.password); load();
} else if (e.target.dataset.toggle) {
const action = e.target.dataset.active === "true" ? "disable" : "enable";
await fetch(`/api/admin/users/${e.target.dataset.toggle}/${action}`, {method:"POST"}); load();
} else if (e.target.id === "add-user-btn") {
const email = prompt("Email?"); if (!email) return;
const name = prompt("Display name?") || email;
const role = prompt("Role (admin / superadmin)?", "admin") || "admin";
const r = await fetch("/api/admin/users", {method:"POST", headers:{"Content-Type":"application/json"},
body: JSON.stringify({email, name, role})});
if (r.ok) { const d = await r.json(); showTemp(email, d.password); load(); }
else { alert((await r.json()).detail || "Failed"); }
}
});
document.addEventListener("change", async (e) => {
if (e.target.dataset.role) {
await fetch(`/api/admin/users/${e.target.dataset.role}/role`, {method:"POST",
headers:{"Content-Type":"application/json"}, body: JSON.stringify({role: e.target.value})});
load();
}
});
load();
</script>
{% endblock %}
+120
View File
@@ -0,0 +1,120 @@
# tests/test_operator_users.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME
from backend.models import OperatorUser
def _login_as(client, user):
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
def test_admin_cannot_reach_user_management(client, db_session, monkeypatch):
admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, admin)
assert client.get("/admin/users", follow_redirects=False).status_code == 403
def test_superadmin_sees_user_management(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.get("/admin/users", follow_redirects=False).status_code == 200
def test_superadmin_lists_users_json(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.get("/api/admin/users")
assert r.status_code == 200
emails = [u["email"] for u in r.json()["users"]]
assert "su@x.com" in emails
assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes
def test_create_user_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post("/api/admin/users",
json={"email": "dad@x.com", "name": "Dad", "role": "admin"})
assert r.status_code == 200
assert len(r.json()["password"]) >= 12
made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first()
assert made.must_change_password is True
def test_reset_password_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/reset-password")
assert r.status_code == 200 and len(r.json()["password"]) >= 12
db_session.refresh(target)
assert target.must_change_password is True
def test_disable_and_enable(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200
db_session.refresh(target); assert target.active is False
assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200
db_session.refresh(target); assert target.active is True
def test_change_role(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"})
assert r.status_code == 200
db_session.refresh(target); assert target.role == "superadmin"
def test_admin_cannot_reach_json_endpoints(client, db_session, monkeypatch):
admin, _ = create_operator(db_session, "a@x.com", "A", "admin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, admin)
assert client.get("/api/admin/users").status_code == 403
assert client.post("/api/admin/users", json={"email": "x@x.com", "name": "X", "role": "admin"}).status_code == 403
assert client.post(f"/api/admin/users/{target.id}/reset-password").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}).status_code == 403
def test_cannot_disable_own_account(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{su.id}/disable")
assert r.status_code == 400
db_session.refresh(su)
assert su.active is True
def test_cannot_change_own_role(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{su.id}/role", json={"role": "admin"})
assert r.status_code == 400
db_session.refresh(su)
assert su.role == "superadmin"
def test_deferred_operator_role_rejected_by_api(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.post("/api/admin/users", json={"email": "op@x.com", "name": "Op", "role": "operator"}).status_code == 400
assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "operator"}).status_code == 400