feat(auth): OperatorUser model + role ladder
Add OperatorUser SQLAlchemy model (operator_users table, auto-created by create_all) with email uniqueness, default active/must_change_password/ failed_login_count, and sessions_valid_from truncated to whole seconds. Add backend/operator_auth.py with feature flag, cookie constants, _ROLE_RANK map, role_at_least(), and _norm_email() helpers. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,13 @@ from datetime import datetime
|
||||
from backend.database import Base
|
||||
|
||||
|
||||
def _utcnow_seconds():
|
||||
"""utcnow truncated to whole seconds — used as the default for
|
||||
sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second
|
||||
epoch) never falls a few microseconds before it and self-invalidates."""
|
||||
return datetime.utcnow().replace(microsecond=0)
|
||||
|
||||
|
||||
class Emitter(Base):
|
||||
__tablename__ = "emitters"
|
||||
|
||||
@@ -772,3 +779,27 @@ class ClientAccessToken(Base):
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
last_used_at = Column(DateTime, nullable=True)
|
||||
revoked_at = Column(DateTime, nullable=True) # set = link no longer works
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# OPERATOR AUTH — internal operator logins (see backend/operator_auth.py)
|
||||
# ============================================================================
|
||||
|
||||
class OperatorUser(Base):
|
||||
"""An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and
|
||||
'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table
|
||||
→ create_all builds it, no migration. Never store or log raw passwords."""
|
||||
__tablename__ = "operator_users"
|
||||
|
||||
id = Column(String, primary_key=True, index=True) # UUID
|
||||
email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased
|
||||
display_name = Column(String, nullable=False) # "Brian", "Dad"
|
||||
password_hash = Column(String, nullable=False) # argon2id
|
||||
role = Column(String, nullable=False, default="admin") # superadmin | admin
|
||||
active = Column(Boolean, default=True) # False = login disabled
|
||||
must_change_password = Column(Boolean, default=False) # forces a change next login
|
||||
sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere
|
||||
failed_login_count = Column(Integer, default=0) # lockout counter
|
||||
locked_until = Column(DateTime, nullable=True) # set after too many bad tries
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
last_login_at = Column(DateTime, nullable=True)
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
# backend/operator_auth.py
|
||||
"""Operator authentication: the deny-by-default gate, session cookie, login +
|
||||
lockout, and the small data helpers shared by the routes and the CLI. Reuses the
|
||||
argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies).
|
||||
|
||||
The flag and SessionLocal are read as module globals at call time so tests can
|
||||
monkeypatch them."""
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from backend.models import OperatorUser
|
||||
from backend.auth_passwords import hash_password, verify_password, generate_password
|
||||
|
||||
# Feature flag — OFF by default. When off, the gate and require_role both pass
|
||||
# everything through and the app behaves exactly as it does today.
|
||||
OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes")
|
||||
|
||||
COOKIE_NAME = "tv_session"
|
||||
COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device")
|
||||
MAX_LOGIN_FAILURES = 5
|
||||
LOCK_MINUTES = 15
|
||||
|
||||
# Role ladder — a rank map so checks read naturally and 'operator' slots in later.
|
||||
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
|
||||
|
||||
|
||||
def role_at_least(role: str, minimum: str) -> bool:
|
||||
"""True iff `role` ranks at or above `minimum`. Unknown roles rank as 0."""
|
||||
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum]
|
||||
|
||||
|
||||
def _norm_email(email: str) -> str:
|
||||
return (email or "").strip().lower()
|
||||
@@ -0,0 +1,36 @@
|
||||
# tests/test_operator_model.py
|
||||
import uuid
|
||||
from backend.models import OperatorUser
|
||||
from backend.operator_auth import role_at_least, _ROLE_RANK
|
||||
|
||||
|
||||
def test_operator_user_defaults(db_session):
|
||||
u = OperatorUser(id=str(uuid.uuid4()), email="a@x.com", display_name="A",
|
||||
password_hash="h", role="admin")
|
||||
db_session.add(u)
|
||||
db_session.commit()
|
||||
got = db_session.query(OperatorUser).filter_by(email="a@x.com").first()
|
||||
assert got.active is True
|
||||
assert got.must_change_password is False
|
||||
assert got.failed_login_count == 0
|
||||
assert got.locked_until is None
|
||||
assert got.sessions_valid_from is not None
|
||||
assert got.sessions_valid_from.microsecond == 0 # truncated to whole seconds
|
||||
|
||||
|
||||
def test_email_is_unique(db_session):
|
||||
for i in range(2):
|
||||
db_session.add(OperatorUser(id=str(uuid.uuid4()), email="dup@x.com",
|
||||
display_name="d", password_hash="h", role="admin"))
|
||||
import pytest
|
||||
with pytest.raises(Exception):
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def test_role_ladder():
|
||||
assert _ROLE_RANK == {"operator": 10, "admin": 20, "superadmin": 30}
|
||||
assert role_at_least("superadmin", "admin") is True
|
||||
assert role_at_least("admin", "admin") is True
|
||||
assert role_at_least("admin", "superadmin") is False
|
||||
assert role_at_least("operator", "admin") is False
|
||||
assert role_at_least("nonsense", "admin") is False
|
||||
Reference in New Issue
Block a user