feat(auth): authenticate + lockout + operator data helpers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -26,6 +26,11 @@ LOCK_MINUTES = 15
|
||||
# Role ladder — a rank map so checks read naturally and 'operator' slots in later.
|
||||
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
|
||||
|
||||
# A throwaway hash used only to equalize verify time on the unknown-email path,
|
||||
# so a missing account can't be distinguished from a wrong password by timing
|
||||
# (no user-enumeration). The value never authenticates anything.
|
||||
_DUMMY_PASSWORD_HASH = hash_password("operator-auth-timing-equalizer")
|
||||
|
||||
|
||||
def role_at_least(role: str, minimum: str) -> bool:
|
||||
"""True iff `role` ranks at or above `minimum`. Unknown roles rank as 0."""
|
||||
@@ -61,3 +66,93 @@ def current_operator(request, db):
|
||||
if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from:
|
||||
return None
|
||||
return user
|
||||
|
||||
|
||||
def register_login_failure(db, user) -> None:
|
||||
"""Increment a user's failure counter and lock them out past the threshold."""
|
||||
user.failed_login_count = (user.failed_login_count or 0) + 1
|
||||
if user.failed_login_count >= MAX_LOGIN_FAILURES:
|
||||
user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES)
|
||||
db.commit()
|
||||
|
||||
|
||||
def authenticate(db, email, password):
|
||||
"""Return (user, "ok") on success, (None, "locked") if locked out, else
|
||||
(None, "bad"). Never reveals whether the email exists: an unknown email runs
|
||||
the same argon2 verify (against a dummy hash) as a wrong password, so neither
|
||||
the response text nor its timing distinguishes the two."""
|
||||
user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
|
||||
if user and user.locked_until and user.locked_until > datetime.utcnow():
|
||||
return None, "locked"
|
||||
password_ok = verify_password(password, user.password_hash if user else _DUMMY_PASSWORD_HASH)
|
||||
if not user or not user.active or not password_ok:
|
||||
if user:
|
||||
register_login_failure(db, user)
|
||||
return None, "bad"
|
||||
user.failed_login_count = 0
|
||||
user.locked_until = None
|
||||
user.last_login_at = datetime.utcnow()
|
||||
db.commit()
|
||||
return user, "ok"
|
||||
|
||||
|
||||
def create_operator(db, email, name, role, password=None, must_change=None):
|
||||
"""Create an operator. With no password, generate a temp one and force a change
|
||||
(must_change defaults True). With a password, must_change defaults False.
|
||||
Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role."""
|
||||
email = _norm_email(email)
|
||||
if role not in _ROLE_RANK:
|
||||
raise ValueError(f"unknown role {role!r}")
|
||||
if db.query(OperatorUser).filter_by(email=email).first():
|
||||
raise ValueError(f"operator {email} already exists")
|
||||
if password is None:
|
||||
password = generate_password()
|
||||
if must_change is None:
|
||||
must_change = True
|
||||
elif must_change is None:
|
||||
must_change = False
|
||||
user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name,
|
||||
password_hash=hash_password(password), role=role,
|
||||
active=True, must_change_password=must_change)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
return user, password
|
||||
|
||||
|
||||
def reset_operator_password(db, user) -> str:
|
||||
"""Generate a fresh temp password, force a change, log the user out everywhere.
|
||||
Returns the raw password to show once."""
|
||||
raw = generate_password()
|
||||
user.password_hash = hash_password(raw)
|
||||
user.must_change_password = True
|
||||
user.failed_login_count = 0
|
||||
user.locked_until = None
|
||||
user.sessions_valid_from = datetime.utcnow().replace(microsecond=0)
|
||||
db.commit()
|
||||
return raw
|
||||
|
||||
|
||||
def change_own_password(db, user, new_password) -> int:
|
||||
"""Set a user's own new password, clear the forced-change flag, and bump
|
||||
sessions_valid_from to the returned iat — the caller mints the replacement
|
||||
cookie with that exact iat so it stays valid while older cookies die."""
|
||||
new_iat = int(time.time())
|
||||
user.password_hash = hash_password(new_password)
|
||||
user.must_change_password = False
|
||||
user.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
|
||||
db.commit()
|
||||
return new_iat
|
||||
|
||||
|
||||
def set_operator_active(db, user, active: bool):
|
||||
user.active = bool(active)
|
||||
db.commit()
|
||||
return user
|
||||
|
||||
|
||||
def set_operator_role(db, user, role: str):
|
||||
if role not in _ROLE_RANK:
|
||||
raise ValueError(f"unknown role {role!r}")
|
||||
user.role = role
|
||||
db.commit()
|
||||
return user
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
# tests/test_operator_authenticate.py
|
||||
import time
|
||||
from datetime import datetime
|
||||
import pytest
|
||||
|
||||
from backend.operator_auth import (
|
||||
authenticate, create_operator, reset_operator_password,
|
||||
set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES,
|
||||
)
|
||||
from backend.auth_passwords import verify_password
|
||||
from backend.models import OperatorUser
|
||||
|
||||
|
||||
def test_create_operator_generates_temp_and_forces_change(db_session):
|
||||
user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin")
|
||||
assert user.email == "dad@x.com" # lowercased
|
||||
assert user.must_change_password is True
|
||||
assert verify_password(raw, user.password_hash)
|
||||
|
||||
|
||||
def test_create_operator_with_explicit_password_no_forced_change(db_session):
|
||||
user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123")
|
||||
assert raw == "chosen-pw-123"
|
||||
assert user.must_change_password is False
|
||||
|
||||
|
||||
def test_create_operator_rejects_duplicate_and_bad_role(db_session):
|
||||
create_operator(db_session, "a@x.com", "A", "admin")
|
||||
with pytest.raises(ValueError):
|
||||
create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive)
|
||||
with pytest.raises(ValueError):
|
||||
create_operator(db_session, "b@x.com", "B", "wizard") # bad role
|
||||
|
||||
|
||||
def test_authenticate_success(db_session):
|
||||
user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
|
||||
got, status = authenticate(db_session, "OK@x.com", "rightpw-9")
|
||||
assert status == "ok" and got.id == user.id
|
||||
assert got.last_login_at is not None
|
||||
assert got.failed_login_count == 0
|
||||
|
||||
|
||||
def test_authenticate_wrong_password_counts(db_session):
|
||||
create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9")
|
||||
got, status = authenticate(db_session, "wp@x.com", "nope")
|
||||
assert got is None and status == "bad"
|
||||
assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1
|
||||
|
||||
|
||||
def test_lockout_after_five_then_correct_password_refused(db_session):
|
||||
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
|
||||
for _ in range(MAX_LOGIN_FAILURES):
|
||||
authenticate(db_session, "lk@x.com", "nope")
|
||||
got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked
|
||||
assert got is None and status == "locked"
|
||||
|
||||
|
||||
def test_authenticate_unknown_email_is_bad_not_error(db_session):
|
||||
got, status = authenticate(db_session, "ghost@x.com", "whatever")
|
||||
assert got is None and status == "bad"
|
||||
|
||||
|
||||
def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session):
|
||||
user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1")
|
||||
before = user.sessions_valid_from
|
||||
raw = reset_operator_password(db_session, user)
|
||||
assert verify_password(raw, user.password_hash)
|
||||
assert user.must_change_password is True
|
||||
assert user.sessions_valid_from >= before
|
||||
|
||||
|
||||
def test_change_own_password_clears_flag_and_bumps(db_session):
|
||||
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
|
||||
user.must_change_password = True
|
||||
db_session.commit()
|
||||
new_iat = change_own_password(db_session, user, "brand-new-pw-2")
|
||||
assert verify_password("brand-new-pw-2", user.password_hash)
|
||||
assert user.must_change_password is False
|
||||
assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat)
|
||||
|
||||
|
||||
def test_set_active_and_role(db_session):
|
||||
user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1")
|
||||
set_operator_active(db_session, user, False)
|
||||
assert user.active is False
|
||||
set_operator_role(db_session, user, "superadmin")
|
||||
assert user.role == "superadmin"
|
||||
with pytest.raises(ValueError):
|
||||
set_operator_role(db_session, user, "wizard")
|
||||
Reference in New Issue
Block a user