diff --git a/backend/main.py b/backend/main.py index afa2a63..6b7c64f 100644 --- a/backend/main.py +++ b/backend/main.py @@ -98,6 +98,9 @@ app.middleware("http")(operator_gate) from backend.routers import operator_auth_routes app.include_router(operator_auth_routes.router) +from backend.routers import operator_users +app.include_router(operator_users.router) + # Override TemplateResponse to include environment and version in context original_template_response = templates.TemplateResponse def custom_template_response(name, context=None, *args, **kwargs): diff --git a/backend/routers/operator_users.py b/backend/routers/operator_users.py new file mode 100644 index 0000000..4a3531d --- /dev/null +++ b/backend/routers/operator_users.py @@ -0,0 +1,103 @@ +"""Operator account management — superadmin only. Temp passwords are returned in +the JSON response once (shown to the superadmin to hand off); only hashes persist.""" +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.templates_config import templates +from backend.models import OperatorUser +from backend.operator_auth import ( + require_role, create_operator, reset_operator_password, + set_operator_active, set_operator_role, +) +from backend.utils.timezone import format_local_datetime + +router = APIRouter(tags=["operator-users"]) +_superadmin = require_role("superadmin") + + +class NewUser(BaseModel): + email: str + name: str + role: str = "admin" + + +class RoleChange(BaseModel): + role: str + + +def _serialize(u: OperatorUser) -> dict: + from datetime import datetime + return { + "id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role, + "active": bool(u.active), "must_change_password": bool(u.must_change_password), + "locked": bool(u.locked_until and u.locked_until > datetime.utcnow()), + "last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None, + } + + +@router.get("/admin/users") +async def users_page(request: Request, _=Depends(_superadmin)): + return templates.TemplateResponse("admin/users.html", {"request": request}) + + +@router.get("/api/admin/users") +async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)): + users = db.query(OperatorUser).order_by(OperatorUser.display_name).all() + return {"users": [_serialize(u) for u in users]} + + +@router.post("/api/admin/users") +async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)): + if body.role not in ("admin", "superadmin"): + return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"}) + try: + user, raw = create_operator(db, body.email, body.name, body.role) + except ValueError as e: + return JSONResponse(status_code=400, content={"detail": str(e)}) + return {"user": _serialize(user), "password": raw} + + +@router.post("/api/admin/users/{user_id}/reset-password") +async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + raw = reset_operator_password(db, user) + return {"password": raw} + + +@router.post("/api/admin/users/{user_id}/disable") +async def disable_user(user_id: str, acting=Depends(_superadmin), db: Session = Depends(get_db)): + if acting and acting.id == user_id: + return JSONResponse(status_code=400, content={"detail": "Cannot disable your own account"}) + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + set_operator_active(db, user, False) + return {"active": False} + + +@router.post("/api/admin/users/{user_id}/enable") +async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + set_operator_active(db, user, True) + return {"active": True} + + +@router.post("/api/admin/users/{user_id}/role") +async def change_user_role(user_id: str, body: RoleChange, + acting=Depends(_superadmin), db: Session = Depends(get_db)): + if acting and acting.id == user_id: + return JSONResponse(status_code=400, content={"detail": "Cannot change your own role"}) + if body.role not in ("admin", "superadmin"): + return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"}) + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + set_operator_role(db, user, body.role) + return {"role": user.role} diff --git a/templates/admin/users.html b/templates/admin/users.html new file mode 100644 index 0000000..a16d42b --- /dev/null +++ b/templates/admin/users.html @@ -0,0 +1,71 @@ +{% extends "base.html" %} +{% block title %}Operator Accounts{% endblock %} +{% block content %} +
+
+

Operator Accounts

+ +
+ + + + + + +
NameEmailRoleStatusLast login
+
+ +{% endblock %} diff --git a/tests/test_operator_users.py b/tests/test_operator_users.py new file mode 100644 index 0000000..1d149aa --- /dev/null +++ b/tests/test_operator_users.py @@ -0,0 +1,120 @@ +# tests/test_operator_users.py +import uuid +from tests.conftest import wire_operator_auth +from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME +from backend.models import OperatorUser + + +def _login_as(client, user): + client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) + + +def test_admin_cannot_reach_user_management(client, db_session, monkeypatch): + admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, admin) + assert client.get("/admin/users", follow_redirects=False).status_code == 403 + + +def test_superadmin_sees_user_management(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + assert client.get("/admin/users", follow_redirects=False).status_code == 200 + + +def test_superadmin_lists_users_json(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.get("/api/admin/users") + assert r.status_code == 200 + emails = [u["email"] for u in r.json()["users"]] + assert "su@x.com" in emails + assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes + + +def test_create_user_returns_temp_once(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post("/api/admin/users", + json={"email": "dad@x.com", "name": "Dad", "role": "admin"}) + assert r.status_code == 200 + assert len(r.json()["password"]) >= 12 + made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first() + assert made.must_change_password is True + + +def test_reset_password_returns_temp_once(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{target.id}/reset-password") + assert r.status_code == 200 and len(r.json()["password"]) >= 12 + db_session.refresh(target) + assert target.must_change_password is True + + +def test_disable_and_enable(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200 + db_session.refresh(target); assert target.active is False + assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200 + db_session.refresh(target); assert target.active is True + + +def test_change_role(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}) + assert r.status_code == 200 + db_session.refresh(target); assert target.role == "superadmin" + + +def test_admin_cannot_reach_json_endpoints(client, db_session, monkeypatch): + admin, _ = create_operator(db_session, "a@x.com", "A", "admin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, admin) + assert client.get("/api/admin/users").status_code == 403 + assert client.post("/api/admin/users", json={"email": "x@x.com", "name": "X", "role": "admin"}).status_code == 403 + assert client.post(f"/api/admin/users/{target.id}/reset-password").status_code == 403 + assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 403 + assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 403 + assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}).status_code == 403 + + +def test_cannot_disable_own_account(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{su.id}/disable") + assert r.status_code == 400 + db_session.refresh(su) + assert su.active is True + + +def test_cannot_change_own_role(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{su.id}/role", json={"role": "admin"}) + assert r.status_code == 400 + db_session.refresh(su) + assert su.role == "superadmin" + + +def test_deferred_operator_role_rejected_by_api(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + assert client.post("/api/admin/users", json={"email": "op@x.com", "name": "Op", "role": "operator"}).status_code == 400 + assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "operator"}).status_code == 400