feat(auth): operator session cookie + current_operator DB re-validation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 19:11:41 +00:00
parent 4abfcbc293
commit a6e1cb4f87
2 changed files with 91 additions and 0 deletions
+28
View File
@@ -12,6 +12,7 @@ from datetime import datetime, timedelta
from backend.models import OperatorUser
from backend.auth_passwords import hash_password, verify_password, generate_password
from backend.auth_cookies import sign, read, COOKIE_SECURE
# Feature flag — OFF by default. When off, the gate and require_role both pass
# everything through and the app behaves exactly as it does today.
@@ -33,3 +34,30 @@ def role_at_least(role: str, minimum: str) -> bool:
def _norm_email(email: str) -> str:
return (email or "").strip().lower()
def make_operator_cookie(uid: str, iat: int = None) -> str:
"""Sign a tv_session value for a user id. iat defaults to now; pass an explicit
iat when you bump sessions_valid_from to that same instant (change-password)."""
return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())})
def current_operator(request, db):
"""Resolve the OperatorUser for a request's tv_session cookie, or None.
Re-validated against the DB every call: a disabled / locked / password-changed
user drops on the next request. Used by the gate middleware (with its own
session) — does not raise."""
data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE)
if not data:
return None
uid, iat = data.get("uid"), data.get("iat")
if not uid or not isinstance(iat, (int, float)):
return None
user = db.query(OperatorUser).filter_by(id=uid).first()
if not user or not user.active:
return None
if user.locked_until and user.locked_until > datetime.utcnow():
return None
if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from:
return None
return user
+63
View File
@@ -0,0 +1,63 @@
# tests/test_operator_session.py
import time
import uuid
from datetime import datetime, timedelta
from types import SimpleNamespace
from backend.models import OperatorUser
from backend.operator_auth import (
make_operator_cookie, current_operator, COOKIE_NAME,
)
def _make_user(db, **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw)
db.add(u)
db.commit()
return u
def _req(cookie_value):
# current_operator only reads request.cookies — a stub is enough.
return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {})
def test_valid_cookie_resolves_user(db_session):
u = _make_user(db_session)
cookie = make_operator_cookie(u.id)
assert current_operator(_req(cookie), db_session).id == u.id
def test_no_or_garbage_cookie_is_none(db_session):
assert current_operator(_req(None), db_session) is None
assert current_operator(_req("garbage"), db_session) is None
def test_inactive_user_is_none(db_session):
u = _make_user(db_session, active=False)
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_locked_user_is_none(db_session):
u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5))
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_cookie_older_than_sessions_valid_from_is_none(db_session):
u = _make_user(db_session)
old_iat = int(time.time()) - 1000
cookie = make_operator_cookie(u.id, iat=old_iat)
u.sessions_valid_from = datetime.utcnow()
db_session.commit()
assert current_operator(_req(cookie), db_session) is None
def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session):
# Guards the change-password race: bump sessions_valid_from to the new cookie's
# exact iat → that fresh cookie must remain valid.
u = _make_user(db_session)
new_iat = int(time.time())
u.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db_session.commit()
assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id