diff --git a/backend/operator_auth.py b/backend/operator_auth.py index 5927865..1ed0820 100644 --- a/backend/operator_auth.py +++ b/backend/operator_auth.py @@ -12,6 +12,7 @@ from datetime import datetime, timedelta from backend.models import OperatorUser from backend.auth_passwords import hash_password, verify_password, generate_password +from backend.auth_cookies import sign, read, COOKIE_SECURE # Feature flag — OFF by default. When off, the gate and require_role both pass # everything through and the app behaves exactly as it does today. @@ -33,3 +34,30 @@ def role_at_least(role: str, minimum: str) -> bool: def _norm_email(email: str) -> str: return (email or "").strip().lower() + + +def make_operator_cookie(uid: str, iat: int = None) -> str: + """Sign a tv_session value for a user id. iat defaults to now; pass an explicit + iat when you bump sessions_valid_from to that same instant (change-password).""" + return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())}) + + +def current_operator(request, db): + """Resolve the OperatorUser for a request's tv_session cookie, or None. + Re-validated against the DB every call: a disabled / locked / password-changed + user drops on the next request. Used by the gate middleware (with its own + session) — does not raise.""" + data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE) + if not data: + return None + uid, iat = data.get("uid"), data.get("iat") + if not uid or not isinstance(iat, (int, float)): + return None + user = db.query(OperatorUser).filter_by(id=uid).first() + if not user or not user.active: + return None + if user.locked_until and user.locked_until > datetime.utcnow(): + return None + if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from: + return None + return user diff --git a/tests/test_operator_session.py b/tests/test_operator_session.py new file mode 100644 index 0000000..6f30531 --- /dev/null +++ b/tests/test_operator_session.py @@ -0,0 +1,63 @@ +# tests/test_operator_session.py +import time +import uuid +from datetime import datetime, timedelta +from types import SimpleNamespace + +from backend.models import OperatorUser +from backend.operator_auth import ( + make_operator_cookie, current_operator, COOKIE_NAME, +) + + +def _make_user(db, **kw): + u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"), + display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw) + db.add(u) + db.commit() + return u + + +def _req(cookie_value): + # current_operator only reads request.cookies — a stub is enough. + return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {}) + + +def test_valid_cookie_resolves_user(db_session): + u = _make_user(db_session) + cookie = make_operator_cookie(u.id) + assert current_operator(_req(cookie), db_session).id == u.id + + +def test_no_or_garbage_cookie_is_none(db_session): + assert current_operator(_req(None), db_session) is None + assert current_operator(_req("garbage"), db_session) is None + + +def test_inactive_user_is_none(db_session): + u = _make_user(db_session, active=False) + assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None + + +def test_locked_user_is_none(db_session): + u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5)) + assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None + + +def test_cookie_older_than_sessions_valid_from_is_none(db_session): + u = _make_user(db_session) + old_iat = int(time.time()) - 1000 + cookie = make_operator_cookie(u.id, iat=old_iat) + u.sessions_valid_from = datetime.utcnow() + db_session.commit() + assert current_operator(_req(cookie), db_session) is None + + +def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session): + # Guards the change-password race: bump sessions_valid_from to the new cookie's + # exact iat → that fresh cookie must remain valid. + u = _make_user(db_session) + new_iat = int(time.time()) + u.sessions_valid_from = datetime.utcfromtimestamp(new_iat) + db_session.commit() + assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id