feat(auth): login/logout/change-password routes + pages

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 19:31:31 +00:00
parent 2879abb355
commit 41ab900c33
5 changed files with 283 additions and 0 deletions
+3
View File
@@ -95,6 +95,9 @@ async def add_environment_to_context(request: Request, call_next):
from backend.operator_auth import operator_gate
app.middleware("http")(operator_gate)
from backend.routers import operator_auth_routes
app.include_router(operator_auth_routes.router)
# Override TemplateResponse to include environment and version in context
original_template_response = templates.TemplateResponse
def custom_template_response(name, context=None, *args, **kwargs):
+111
View File
@@ -0,0 +1,111 @@
"""Operator login / logout / change-password. These routes intentionally work
regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during
rollout). /login and /logout are on the gate's exempt list; /change-password
requires a session (the gate sets request.state.operator)."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import OperatorUser
from backend.templates_config import templates
from backend.operator_auth import (
authenticate, current_operator, change_own_password, make_operator_cookie,
COOKIE_NAME, COOKIE_MAX_AGE,
)
from backend.auth_cookies import COOKIE_SECURE
from backend.auth_passwords import verify_password
router = APIRouter(tags=["operator-auth"])
def _safe_next(next_url: str) -> str:
"""Only allow same-site relative redirects (an open-redirect guard). Rejects
`//host` and `/\\host` — browsers treat a backslash as `/` in the authority
position, so both escape to an external site."""
if next_url and next_url.startswith("/") and not next_url.startswith(("//", "/\\")):
return next_url
return "/"
@router.get("/login")
async def login_page(request: Request, next: str = "", error: str = ""):
return templates.TemplateResponse("login.html",
{"request": request, "next": next, "error": error})
@router.post("/login")
async def login_submit(request: Request, next: str = "",
email: str = Form(...), password: str = Form(...),
db: Session = Depends(get_db)):
user, status = authenticate(db, email, password)
if status == "locked":
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next,
"error": "Too many attempts — try again in 15 minutes."},
status_code=200)
if user is None:
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next, "error": "Invalid email or password."},
status_code=200)
dest = "/change-password" if user.must_change_password else _safe_next(next)
resp = RedirectResponse(url=dest, status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
@router.get("/logout")
async def logout(request: Request):
resp = RedirectResponse(url="/login", status_code=303)
resp.delete_cookie(COOKIE_NAME)
return resp
@router.get("/change-password")
async def change_password_page(request: Request, db: Session = Depends(get_db)):
user = getattr(request.state, "operator", None) or current_operator(request, db)
if user is None:
return RedirectResponse(url="/login", status_code=303)
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": ""})
@router.post("/change-password")
async def change_password_submit(request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
db: Session = Depends(get_db)):
_user_ref = getattr(request.state, "operator", None) or current_operator(request, db)
if _user_ref is None:
return RedirectResponse(url="/login", status_code=303)
# Re-fetch a session-bound copy so mutations via `db` will be committed.
# request.state.operator may be expunged (detached) from the gate's own
# SessionLocal; operating on a detached object against a different session
# would silently drop the UPDATE.
user = db.query(OperatorUser).filter_by(id=_user_ref.id).first()
if user is None:
return RedirectResponse(url="/login", status_code=303)
def _err(msg):
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": msg},
status_code=200)
if not verify_password(current_password, user.password_hash):
return _err("Current password is incorrect.")
if len(new_password) < 8:
return _err("New password must be at least 8 characters.")
if new_password != confirm_password:
return _err("New passwords do not match.")
new_iat = change_own_password(db, user, new_password)
resp = RedirectResponse(url="/", status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
+39
View File
@@ -0,0 +1,39 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Change password · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-2 text-center">Change your password</h1>
{% if must_change %}
<p class="mb-4 text-sm text-amber-300 text-center">Please set a new password to continue.</p>
{% endif %}
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/change-password" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="current_password">Current password</label>
<input id="current_password" name="current_password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="new_password">New password</label>
<input id="new_password" name="new_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="confirm_password">Confirm new password</label>
<input id="confirm_password" name="confirm_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Update password</button>
</form>
</div>
</body>
</html>
+32
View File
@@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sign in · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-6 text-center">Terra-View</h1>
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/login{% if next %}?next={{ next }}{% endif %}" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="email">Email</label>
<input id="email" name="email" type="email" autofocus required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="password">Password</label>
<input id="password" name="password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Sign in</button>
</form>
<p class="mt-4 text-xs text-slate-400 text-center">Forgot your password? Contact your administrator.</p>
</div>
</body>
</html>
+98
View File
@@ -0,0 +1,98 @@
# tests/test_operator_login.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import (
create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES,
)
def test_login_page_renders(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/login")
assert r.status_code == 200
assert "password" in r.text.lower()
def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/"
assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "")
def test_login_honors_next(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.headers["location"] == "/settings"
def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "nope"},
follow_redirects=False)
assert r.status_code == 200
assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "")
assert "invalid" in r.text.lower()
def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch):
create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change
from backend.models import OperatorUser
user = db_session.query(OperatorUser).filter_by(email="new@x.com").first()
from backend.auth_passwords import hash_password
user.password_hash = hash_password("temp-pw-1")
db_session.commit()
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_login_lockout_message_after_five(client, db_session, monkeypatch):
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
for _ in range(MAX_LOGIN_FAILURES):
client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False)
r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False)
assert r.status_code == 200
assert "too many" in r.text.lower()
def test_logout_clears_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/logout", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/login"
set_cookie = r.headers.get("set-cookie", "").lower()
assert COOKIE_NAME.lower() in set_cookie
assert 'max-age=0' in set_cookie or 'expires=thu, 01 jan 1970' in set_cookie
def test_change_password_self_service(client, db_session, monkeypatch):
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
r = client.post("/change-password",
data={"current_password": "orig-pw-1", "new_password": "brand-new-2",
"confirm_password": "brand-new-2"}, follow_redirects=False)
assert r.status_code == 303
from backend.auth_passwords import verify_password
db_session.refresh(user)
assert verify_password("brand-new-2", user.password_hash)
assert user.must_change_password is False
def test_safe_next_blocks_open_redirect():
from backend.routers.operator_auth_routes import _safe_next
assert _safe_next("//evil.com") == "/"
assert _safe_next("/\\evil.com") == "/" # backslash authority bypass
assert _safe_next("https://evil.com") == "/"
assert _safe_next("") == "/"
assert _safe_next("/settings") == "/settings"