Merge pull request 'Operator-Auth full implementation.' (#70) from feat/operator-auth into dev

Reviewed-on: #70
This commit was merged in pull request #70.
This commit is contained in:
2026-06-18 16:35:59 -04:00
25 changed files with 3581 additions and 0 deletions
+64
View File
@@ -0,0 +1,64 @@
# backend/auth_cookies.py
"""Generic HMAC-signed cookie payloads, shared by operator auth (and, optionally
later, the portal). A signed value is f"{b64url(json)}.{hmac_sha256(b64)}"; read()
verifies the signature in constant time and enforces a server-side iat expiry.
The signing secret is the same SECRET_KEY the portal already reads, so a single
env var protects both cookies. Never store or log raw secrets."""
import os
import hmac
import json
import time
import base64
import hashlib
import logging
logger = logging.getLogger(__name__)
# Same env var the portal cookie uses — one secret protects both. The insecure
# default only exists so dev/test boots without config; set a real SECRET_KEY in prod.
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me")
# Set COOKIE_SECURE=true once served over HTTPS; leave false on plain HTTP or the
# browser won't send the cookie.
COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false").lower() in ("1", "true", "yes")
def _sign(body: str) -> str:
return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest()
def sign(payload: dict) -> str:
"""Serialize + sign a payload dict into a cookie-safe string."""
body = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()
return f"{body}.{_sign(body)}"
def read(raw, max_age: int):
"""Verify a signed value and return its payload dict, or None if missing,
tampered, or older than max_age seconds (by its own `iat`)."""
if not raw or not isinstance(raw, str):
return None
try:
body, sig = raw.rsplit(".", 1)
except (ValueError, AttributeError):
return None
if not hmac.compare_digest(sig, _sign(body)):
return None
try:
data = json.loads(base64.urlsafe_b64decode(body.encode()))
except Exception:
return None
if not isinstance(data, dict):
return None
iat = data.get("iat")
if not isinstance(iat, (int, float)):
return None
now = time.time()
# Reject implausibly future-dated tokens: the same server signs and verifies,
# so there's no real clock skew — a far-future iat (e.g. to dodge max_age or
# outlive a sessions_valid_from bump) is bogus. 60s of slack is generous.
if iat - now > 60:
return None
if (now - iat) > max_age:
return None
return data
+12
View File
@@ -89,6 +89,18 @@ async def add_environment_to_context(request: Request, call_next):
response = await call_next(request)
return response
# Operator auth — deny-by-default gate over the whole internal app. Governed by
# OPERATOR_AUTH_ENABLED (default off → behaves exactly as today). See
# docs/superpowers/specs/2026-06-17-operator-auth-design.md.
from backend.operator_auth import operator_gate
app.middleware("http")(operator_gate)
from backend.routers import operator_auth_routes
app.include_router(operator_auth_routes.router)
from backend.routers import operator_users
app.include_router(operator_users.router)
# Override TemplateResponse to include environment and version in context
original_template_response = templates.TemplateResponse
def custom_template_response(name, context=None, *args, **kwargs):
+31
View File
@@ -3,6 +3,13 @@ from datetime import datetime
from backend.database import Base
def _utcnow_seconds():
"""utcnow truncated to whole seconds — used as the default for
sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second
epoch) never falls a few microseconds before it and self-invalidates."""
return datetime.utcnow().replace(microsecond=0)
class Emitter(Base):
__tablename__ = "emitters"
@@ -772,3 +779,27 @@ class ClientAccessToken(Base):
created_at = Column(DateTime, default=datetime.utcnow)
last_used_at = Column(DateTime, nullable=True)
revoked_at = Column(DateTime, nullable=True) # set = link no longer works
# ============================================================================
# OPERATOR AUTH — internal operator logins (see backend/operator_auth.py)
# ============================================================================
class OperatorUser(Base):
"""An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and
'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table
→ create_all builds it, no migration. Never store or log raw passwords."""
__tablename__ = "operator_users"
id = Column(String, primary_key=True, index=True) # UUID
email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased
display_name = Column(String, nullable=False) # "Brian", "Dad"
password_hash = Column(String, nullable=False) # argon2id
role = Column(String, nullable=False, default="admin") # superadmin | admin
active = Column(Boolean, default=True) # False = login disabled
must_change_password = Column(Boolean, default=False) # forces a change next login
sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere
failed_login_count = Column(Integer, default=0) # lockout counter
locked_until = Column(DateTime, nullable=True) # set after too many bad tries
created_at = Column(DateTime, default=datetime.utcnow)
last_login_at = Column(DateTime, nullable=True)
+137
View File
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Operator-account admin CLI — the bootstrap and the break-glass. Run inside the
terra-view container against the live DB. Temp/raw passwords are printed ONCE; only
hashes persist.
# first superadmin (before any UI is reachable) — prompts for a password, or --generate
python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"
# a parent's account — generates a temp password, must-change on first login
python3 backend/operator_admin.py create-user --email dad@x.com --name "Dad" --role admin
python3 backend/operator_admin.py reset-password --email dad@x.com
python3 backend/operator_admin.py list
python3 backend/operator_admin.py disable --email dad@x.com
python3 backend/operator_admin.py enable --email dad@x.com
"""
import os
import sys
import getpass
import argparse
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.database import SessionLocal
from backend.models import OperatorUser
from backend.operator_auth import (
create_operator, reset_operator_password, set_operator_active, _norm_email,
)
def _get(db, email):
u = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if not u:
sys.exit(f"No operator with email '{email}'.")
return u
def cmd_create_superadmin(email, name, password=None, generate=False):
db = SessionLocal()
try:
if password is None and not generate:
password = getpass.getpass("Password for new superadmin: ")
if not password or len(password) < 8:
sys.exit("Password must be at least 8 characters.")
user, raw = create_operator(db, email, name, "superadmin",
password=None if generate else password)
if generate:
print(f"✓ Superadmin {user.email} created. Temp password (shown once): {raw}")
else:
print(f"✓ Superadmin {user.email} created.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_create_user(email, name, role="admin"):
db = SessionLocal()
try:
user, raw = create_operator(db, email, name, role)
print(f"{role} {user.email} created. Temp password (shown once): {raw}")
print(" They'll be required to change it on first login.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_reset_password(email):
db = SessionLocal()
try:
user = _get(db, email)
raw = reset_operator_password(db, user)
print(f"✓ Reset {user.email}. Temp password (shown once): {raw}")
finally:
db.close()
def cmd_set_active(email, active):
db = SessionLocal()
try:
user = _get(db, email)
set_operator_active(db, user, active)
print(f"{user.email} {'enabled' if active else 'disabled'}.")
finally:
db.close()
def cmd_list():
db = SessionLocal()
try:
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
if not users:
print("No operators yet. Run create-superadmin first.")
return
for u in users:
locked = " [LOCKED]" if (u.locked_until and u.locked_until > datetime.utcnow()) else ""
state = "active" if u.active else "DISABLED"
last = u.last_login_at.strftime("%Y-%m-%d %H:%M") if u.last_login_at else "never"
print(f" {u.display_name:<12} {u.email:<28} {u.role:<11} {state}{locked} last: {last}")
finally:
db.close()
def main():
ap = argparse.ArgumentParser(description="Operator-account admin")
sub = ap.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("create-superadmin")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--generate", action="store_true", help="generate a temp password instead of prompting")
p.set_defaults(fn=lambda a: cmd_create_superadmin(a.email, a.name, generate=a.generate))
p = sub.add_parser("create-user")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--role", default="admin", choices=["admin", "superadmin"])
p.set_defaults(fn=lambda a: cmd_create_user(a.email, a.name, a.role))
p = sub.add_parser("reset-password")
p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_reset_password(a.email))
p = sub.add_parser("disable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, False))
p = sub.add_parser("enable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, True))
p = sub.add_parser("list"); p.set_defaults(fn=lambda a: cmd_list())
args = ap.parse_args()
args.fn(args)
if __name__ == "__main__":
main()
+231
View File
@@ -0,0 +1,231 @@
# backend/operator_auth.py
"""Operator authentication: the deny-by-default gate, session cookie, login +
lockout, and the small data helpers shared by the routes and the CLI. Reuses the
argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies).
The flag and SessionLocal are read as module globals at call time so tests can
monkeypatch them."""
import os
import time
import uuid
from datetime import datetime, timedelta
from urllib.parse import quote
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse
from backend.models import OperatorUser
from backend.auth_passwords import hash_password, verify_password, generate_password
from backend.auth_cookies import sign, read, COOKIE_SECURE
from backend.database import SessionLocal
# Feature flag — OFF by default. When off, the gate and require_role both pass
# everything through and the app behaves exactly as it does today.
OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes")
COOKIE_NAME = "tv_session"
COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device")
MAX_LOGIN_FAILURES = 5
LOCK_MINUTES = 15
# Role ladder — a rank map so checks read naturally and 'operator' slots in later.
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
# A throwaway hash used only to equalize verify time on the unknown-email path,
# so a missing account can't be distinguished from a wrong password by timing
# (no user-enumeration). The value never authenticates anything.
_DUMMY_PASSWORD_HASH = hash_password("operator-auth-timing-equalizer")
def role_at_least(role: str, minimum: str) -> bool:
"""True iff `role` ranks at or above `minimum`. Unknown roles rank as 0."""
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum]
def _norm_email(email: str) -> str:
return (email or "").strip().lower()
def make_operator_cookie(uid: str, iat: int = None) -> str:
"""Sign a tv_session value for a user id. iat defaults to now; pass an explicit
iat when you bump sessions_valid_from to that same instant (change-password)."""
return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())})
def current_operator(request, db):
"""Resolve the OperatorUser for a request's tv_session cookie, or None.
Re-validated against the DB every call: a disabled / locked / password-changed
user drops on the next request. Used by the gate middleware (with its own
session) — does not raise."""
data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE)
if not data:
return None
uid, iat = data.get("uid"), data.get("iat")
if not uid or not isinstance(iat, (int, float)):
return None
user = db.query(OperatorUser).filter_by(id=uid).first()
if not user or not user.active:
return None
if user.locked_until and user.locked_until > datetime.utcnow():
return None
if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from:
return None
return user
def register_login_failure(db, user) -> None:
"""Increment a user's failure counter and lock them out past the threshold."""
user.failed_login_count = (user.failed_login_count or 0) + 1
if user.failed_login_count >= MAX_LOGIN_FAILURES:
user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES)
db.commit()
def authenticate(db, email, password):
"""Return (user, "ok") on success, (None, "locked") if locked out, else
(None, "bad"). Never reveals whether the email exists: an unknown email runs
the same argon2 verify (against a dummy hash) as a wrong password, so neither
the response text nor its timing distinguishes the two."""
user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if user and user.locked_until and user.locked_until > datetime.utcnow():
return None, "locked"
password_ok = verify_password(password, user.password_hash if user else _DUMMY_PASSWORD_HASH)
if not user or not user.active or not password_ok:
if user:
register_login_failure(db, user)
return None, "bad"
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.utcnow()
db.commit()
return user, "ok"
def create_operator(db, email, name, role, password=None, must_change=None):
"""Create an operator. With no password, generate a temp one and force a change
(must_change defaults True). With a password, must_change defaults False.
Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role."""
email = _norm_email(email)
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
if db.query(OperatorUser).filter_by(email=email).first():
raise ValueError(f"operator {email} already exists")
if password is None:
password = generate_password()
if must_change is None:
must_change = True
elif must_change is None:
must_change = False
user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name,
password_hash=hash_password(password), role=role,
active=True, must_change_password=must_change)
db.add(user)
db.commit()
return user, password
def reset_operator_password(db, user) -> str:
"""Generate a fresh temp password, force a change, log the user out everywhere.
Returns the raw password to show once."""
raw = generate_password()
user.password_hash = hash_password(raw)
user.must_change_password = True
user.failed_login_count = 0
user.locked_until = None
user.sessions_valid_from = datetime.utcnow().replace(microsecond=0)
db.commit()
return raw
def change_own_password(db, user, new_password) -> int:
"""Set a user's own new password, clear the forced-change flag, and bump
sessions_valid_from to the returned iat — the caller mints the replacement
cookie with that exact iat so it stays valid while older cookies die."""
new_iat = int(time.time())
user.password_hash = hash_password(new_password)
user.must_change_password = False
user.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db.commit()
return new_iat
def set_operator_active(db, user, active: bool):
user.active = bool(active)
db.commit()
return user
def set_operator_role(db, user, role: str):
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
user.role = role
db.commit()
return user
# Routes reachable with no login. A new route added next year is gated by default.
_EXEMPT_EXACT = {
"/login", "/logout", "/health",
"/manifest.json", "/sw.js", "/favicon.ico", "/offline-db.js",
"/portal", # portal home (its own auth)
# machine endpoints — LAN-only, automated, no human (watchers/heartbeats):
"/emitters/report", "/api/series3/heartbeat", "/api/series4/heartbeat",
}
_EXEMPT_PREFIX = ("/static/", "/portal/")
def _is_exempt(path: str) -> bool:
return path in _EXEMPT_EXACT or path.startswith(_EXEMPT_PREFIX)
async def operator_gate(request: Request, call_next):
"""Deny-by-default gate. Flag off → pass through (app as today). Flag on →
exempt paths pass; otherwise require a valid operator session, stash it on
request.state.operator, and force a password change when pending."""
if not OPERATOR_AUTH_ENABLED:
return await call_next(request)
# CORS preflight carries no auth and must reach CORSMiddleware, not the gate.
if request.method == "OPTIONS":
return await call_next(request)
path = request.url.path
if _is_exempt(path):
return await call_next(request)
db = SessionLocal()
try:
user = current_operator(request, db)
if user is not None:
db.expunge(user) # detach a fully-loaded row so we can close now
finally:
db.close()
if user is None:
if path.startswith("/api/"):
return JSONResponse({"detail": "Not authenticated"}, status_code=401)
return RedirectResponse(f"/login?next={quote(path)}", status_code=303)
if user.must_change_password and path not in ("/change-password", "/logout"):
if path.startswith("/api/"):
return JSONResponse({"detail": "Password change required"}, status_code=403)
return RedirectResponse("/change-password", status_code=303)
request.state.operator = user
return await call_next(request)
def require_role(minimum: str):
"""Dependency factory: require a logged-in operator ranked >= `minimum`.
Respects the flag (off → pass through). When on, the middleware has already
set request.state.operator before this runs."""
def _dep(request: Request):
if not OPERATOR_AUTH_ENABLED:
return None
user = getattr(request.state, "operator", None)
if user is None:
raise HTTPException(status_code=401, detail="Not authenticated")
if not role_at_least(user.role, minimum):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return _dep
+111
View File
@@ -0,0 +1,111 @@
"""Operator login / logout / change-password. These routes intentionally work
regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during
rollout). /login and /logout are on the gate's exempt list; /change-password
requires a session (the gate sets request.state.operator)."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import OperatorUser
from backend.templates_config import templates
from backend.operator_auth import (
authenticate, current_operator, change_own_password, make_operator_cookie,
COOKIE_NAME, COOKIE_MAX_AGE,
)
from backend.auth_cookies import COOKIE_SECURE
from backend.auth_passwords import verify_password
router = APIRouter(tags=["operator-auth"])
def _safe_next(next_url: str) -> str:
"""Only allow same-site relative redirects (an open-redirect guard). Rejects
`//host` and `/\\host` — browsers treat a backslash as `/` in the authority
position, so both escape to an external site."""
if next_url and next_url.startswith("/") and not next_url.startswith(("//", "/\\")):
return next_url
return "/"
@router.get("/login")
async def login_page(request: Request, next: str = "", error: str = ""):
return templates.TemplateResponse("login.html",
{"request": request, "next": next, "error": error})
@router.post("/login")
async def login_submit(request: Request, next: str = "",
email: str = Form(...), password: str = Form(...),
db: Session = Depends(get_db)):
user, status = authenticate(db, email, password)
if status == "locked":
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next,
"error": "Too many attempts — try again in 15 minutes."},
status_code=200)
if user is None:
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next, "error": "Invalid email or password."},
status_code=200)
dest = "/change-password" if user.must_change_password else _safe_next(next)
resp = RedirectResponse(url=dest, status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
@router.get("/logout")
async def logout(request: Request):
resp = RedirectResponse(url="/login", status_code=303)
resp.delete_cookie(COOKIE_NAME)
return resp
@router.get("/change-password")
async def change_password_page(request: Request, db: Session = Depends(get_db)):
user = getattr(request.state, "operator", None) or current_operator(request, db)
if user is None:
return RedirectResponse(url="/login", status_code=303)
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": ""})
@router.post("/change-password")
async def change_password_submit(request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
db: Session = Depends(get_db)):
_user_ref = getattr(request.state, "operator", None) or current_operator(request, db)
if _user_ref is None:
return RedirectResponse(url="/login", status_code=303)
# Re-fetch a session-bound copy so mutations via `db` will be committed.
# request.state.operator may be expunged (detached) from the gate's own
# SessionLocal; operating on a detached object against a different session
# would silently drop the UPDATE.
user = db.query(OperatorUser).filter_by(id=_user_ref.id).first()
if user is None:
return RedirectResponse(url="/login", status_code=303)
def _err(msg):
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": msg},
status_code=200)
if not verify_password(current_password, user.password_hash):
return _err("Current password is incorrect.")
if len(new_password) < 8:
return _err("New password must be at least 8 characters.")
if new_password != confirm_password:
return _err("New passwords do not match.")
new_iat = change_own_password(db, user, new_password)
resp = RedirectResponse(url="/", status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
+115
View File
@@ -0,0 +1,115 @@
"""Operator account management — superadmin only. Temp passwords are returned in
the JSON response once (shown to the superadmin to hand off); only hashes persist."""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
from backend.models import OperatorUser
from backend.operator_auth import (
require_role, create_operator, reset_operator_password,
set_operator_active, set_operator_role,
)
import backend.operator_auth as operator_auth
from backend.utils.timezone import format_local_datetime
def _require_auth_enabled():
"""The operator-management surface does not exist while operator auth is
disabled — otherwise these net-new endpoints would be world-open with the
flag off (the default), letting anyone pre-seed a superadmin. Read the flag
as a live module attribute so the test monkeypatch and a runtime flip both
take effect."""
if not operator_auth.OPERATOR_AUTH_ENABLED:
raise HTTPException(status_code=404, detail="Not found")
router = APIRouter(tags=["operator-users"], dependencies=[Depends(_require_auth_enabled)])
_superadmin = require_role("superadmin")
class NewUser(BaseModel):
email: str
name: str
role: str = "admin"
class RoleChange(BaseModel):
role: str
def _serialize(u: OperatorUser) -> dict:
from datetime import datetime
return {
"id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role,
"active": bool(u.active), "must_change_password": bool(u.must_change_password),
"locked": bool(u.locked_until and u.locked_until > datetime.utcnow()),
"last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None,
}
@router.get("/admin/users")
async def users_page(request: Request, _=Depends(_superadmin)):
return templates.TemplateResponse("admin/users.html", {"request": request})
@router.get("/api/admin/users")
async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)):
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
return {"users": [_serialize(u) for u in users]}
@router.post("/api/admin/users")
async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)):
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
try:
user, raw = create_operator(db, body.email, body.name, body.role)
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
return {"user": _serialize(user), "password": raw}
@router.post("/api/admin/users/{user_id}/reset-password")
async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
raw = reset_operator_password(db, user)
return {"password": raw}
@router.post("/api/admin/users/{user_id}/disable")
async def disable_user(user_id: str, acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot disable your own account"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, False)
return {"active": False}
@router.post("/api/admin/users/{user_id}/enable")
async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, True)
return {"active": True}
@router.post("/api/admin/users/{user_id}/role")
async def change_user_role(user_id: str, body: RoleChange,
acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot change your own role"})
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_role(db, user, body.role)
return {"role": user.role}