feat(auth): deny-by-default gate middleware + require_role

Adds operator_gate Starlette HTTP middleware that gates every route
except an explicit allow-list. Flag defaults OFF so all existing
behaviour and tests are unchanged. wire_operator_auth helper in
conftest lets tests monkeypatch the module-global SessionLocal and
flag, keeping the gate's own DB session pointed at the test engine.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 19:22:15 +00:00
parent e8fe4845aa
commit 2879abb355
4 changed files with 156 additions and 0 deletions
+12
View File
@@ -62,3 +62,15 @@ def make_project(db_session, name=None, **kwargs):
db_session.add(p)
db_session.commit()
return p
def wire_operator_auth(monkeypatch, db_session, enabled=True):
"""Point the gate middleware's SessionLocal at the test engine and flip the
flag. The middleware opens its OWN session (it can't use the get_db override),
so it must read the same engine the test writes to."""
import backend.operator_auth as oa
from sqlalchemy.orm import sessionmaker
maker = sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False)
monkeypatch.setattr(oa, "SessionLocal", maker, raising=False)
monkeypatch.setattr(oa, "OPERATOR_AUTH_ENABLED", enabled, raising=False)
return oa
+69
View File
@@ -0,0 +1,69 @@
# tests/test_operator_gate.py
import uuid
from tests.conftest import wire_operator_auth
from backend.models import OperatorUser
from backend.operator_auth import make_operator_cookie, COOKIE_NAME
from backend.auth_passwords import hash_password
def _make_user(db, role="admin", **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash=hash_password("pw"), role=role, **kw)
db.add(u)
db.commit()
return u
def test_flag_off_passes_everything(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=False)
assert client.get("/", follow_redirects=False).status_code == 200
def test_gated_html_redirects_to_login_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"].startswith("/login?next=")
def test_gated_api_returns_401_json_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/api/status-snapshot", follow_redirects=False)
assert r.status_code == 401
def test_valid_session_passes(client, db_session, monkeypatch):
u = _make_user(db_session)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
assert client.get("/", follow_redirects=False).status_code == 200
def test_must_change_password_user_routed_to_change_password(client, db_session, monkeypatch):
u = _make_user(db_session, must_change_password=True)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_exempt_paths_pass_without_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
assert client.get("/health", follow_redirects=False).status_code == 200
def test_portal_paths_are_exempt(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# /portal/p/<bad> hits the portal's own gate (403/404), never the operator login.
r = client.get("/portal/p/nope", follow_redirects=False)
assert r.status_code in (403, 404)
def test_must_change_user_on_api_gets_403_json_not_redirect(client, db_session, monkeypatch):
u = _make_user(db_session, must_change_password=True)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
r = client.get("/api/status-snapshot", follow_redirects=False)
assert r.status_code == 403
assert r.json()["detail"] == "Password change required"