From e8fe4845aaf56d82c7bc4bc784acaae355fdee1d Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 17 Jun 2026 19:16:09 +0000 Subject: [PATCH] feat(auth): authenticate + lockout + operator data helpers Co-Authored-By: Claude Sonnet 4.6 --- backend/operator_auth.py | 95 +++++++++++++++++++++++++++++ tests/test_operator_authenticate.py | 89 +++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 tests/test_operator_authenticate.py diff --git a/backend/operator_auth.py b/backend/operator_auth.py index 1ed0820..4ec1288 100644 --- a/backend/operator_auth.py +++ b/backend/operator_auth.py @@ -26,6 +26,11 @@ LOCK_MINUTES = 15 # Role ladder — a rank map so checks read naturally and 'operator' slots in later. _ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30} +# A throwaway hash used only to equalize verify time on the unknown-email path, +# so a missing account can't be distinguished from a wrong password by timing +# (no user-enumeration). The value never authenticates anything. +_DUMMY_PASSWORD_HASH = hash_password("operator-auth-timing-equalizer") + def role_at_least(role: str, minimum: str) -> bool: """True iff `role` ranks at or above `minimum`. Unknown roles rank as 0.""" @@ -61,3 +66,93 @@ def current_operator(request, db): if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from: return None return user + + +def register_login_failure(db, user) -> None: + """Increment a user's failure counter and lock them out past the threshold.""" + user.failed_login_count = (user.failed_login_count or 0) + 1 + if user.failed_login_count >= MAX_LOGIN_FAILURES: + user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES) + db.commit() + + +def authenticate(db, email, password): + """Return (user, "ok") on success, (None, "locked") if locked out, else + (None, "bad"). Never reveals whether the email exists: an unknown email runs + the same argon2 verify (against a dummy hash) as a wrong password, so neither + the response text nor its timing distinguishes the two.""" + user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first() + if user and user.locked_until and user.locked_until > datetime.utcnow(): + return None, "locked" + password_ok = verify_password(password, user.password_hash if user else _DUMMY_PASSWORD_HASH) + if not user or not user.active or not password_ok: + if user: + register_login_failure(db, user) + return None, "bad" + user.failed_login_count = 0 + user.locked_until = None + user.last_login_at = datetime.utcnow() + db.commit() + return user, "ok" + + +def create_operator(db, email, name, role, password=None, must_change=None): + """Create an operator. With no password, generate a temp one and force a change + (must_change defaults True). With a password, must_change defaults False. + Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role.""" + email = _norm_email(email) + if role not in _ROLE_RANK: + raise ValueError(f"unknown role {role!r}") + if db.query(OperatorUser).filter_by(email=email).first(): + raise ValueError(f"operator {email} already exists") + if password is None: + password = generate_password() + if must_change is None: + must_change = True + elif must_change is None: + must_change = False + user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name, + password_hash=hash_password(password), role=role, + active=True, must_change_password=must_change) + db.add(user) + db.commit() + return user, password + + +def reset_operator_password(db, user) -> str: + """Generate a fresh temp password, force a change, log the user out everywhere. + Returns the raw password to show once.""" + raw = generate_password() + user.password_hash = hash_password(raw) + user.must_change_password = True + user.failed_login_count = 0 + user.locked_until = None + user.sessions_valid_from = datetime.utcnow().replace(microsecond=0) + db.commit() + return raw + + +def change_own_password(db, user, new_password) -> int: + """Set a user's own new password, clear the forced-change flag, and bump + sessions_valid_from to the returned iat — the caller mints the replacement + cookie with that exact iat so it stays valid while older cookies die.""" + new_iat = int(time.time()) + user.password_hash = hash_password(new_password) + user.must_change_password = False + user.sessions_valid_from = datetime.utcfromtimestamp(new_iat) + db.commit() + return new_iat + + +def set_operator_active(db, user, active: bool): + user.active = bool(active) + db.commit() + return user + + +def set_operator_role(db, user, role: str): + if role not in _ROLE_RANK: + raise ValueError(f"unknown role {role!r}") + user.role = role + db.commit() + return user diff --git a/tests/test_operator_authenticate.py b/tests/test_operator_authenticate.py new file mode 100644 index 0000000..5b9737f --- /dev/null +++ b/tests/test_operator_authenticate.py @@ -0,0 +1,89 @@ +# tests/test_operator_authenticate.py +import time +from datetime import datetime +import pytest + +from backend.operator_auth import ( + authenticate, create_operator, reset_operator_password, + set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES, +) +from backend.auth_passwords import verify_password +from backend.models import OperatorUser + + +def test_create_operator_generates_temp_and_forces_change(db_session): + user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin") + assert user.email == "dad@x.com" # lowercased + assert user.must_change_password is True + assert verify_password(raw, user.password_hash) + + +def test_create_operator_with_explicit_password_no_forced_change(db_session): + user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123") + assert raw == "chosen-pw-123" + assert user.must_change_password is False + + +def test_create_operator_rejects_duplicate_and_bad_role(db_session): + create_operator(db_session, "a@x.com", "A", "admin") + with pytest.raises(ValueError): + create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive) + with pytest.raises(ValueError): + create_operator(db_session, "b@x.com", "B", "wizard") # bad role + + +def test_authenticate_success(db_session): + user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + got, status = authenticate(db_session, "OK@x.com", "rightpw-9") + assert status == "ok" and got.id == user.id + assert got.last_login_at is not None + assert got.failed_login_count == 0 + + +def test_authenticate_wrong_password_counts(db_session): + create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9") + got, status = authenticate(db_session, "wp@x.com", "nope") + assert got is None and status == "bad" + assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1 + + +def test_lockout_after_five_then_correct_password_refused(db_session): + create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") + for _ in range(MAX_LOGIN_FAILURES): + authenticate(db_session, "lk@x.com", "nope") + got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked + assert got is None and status == "locked" + + +def test_authenticate_unknown_email_is_bad_not_error(db_session): + got, status = authenticate(db_session, "ghost@x.com", "whatever") + assert got is None and status == "bad" + + +def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session): + user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1") + before = user.sessions_valid_from + raw = reset_operator_password(db_session, user) + assert verify_password(raw, user.password_hash) + assert user.must_change_password is True + assert user.sessions_valid_from >= before + + +def test_change_own_password_clears_flag_and_bumps(db_session): + user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") + user.must_change_password = True + db_session.commit() + new_iat = change_own_password(db_session, user, "brand-new-pw-2") + assert verify_password("brand-new-pw-2", user.password_hash) + assert user.must_change_password is False + assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat) + + +def test_set_active_and_role(db_session): + user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1") + set_operator_active(db_session, user, False) + assert user.active is False + set_operator_role(db_session, user, "superadmin") + assert user.role == "superadmin" + with pytest.raises(ValueError): + set_operator_role(db_session, user, "wizard")