diff --git a/backend/main.py b/backend/main.py index 6b82727..afa2a63 100644 --- a/backend/main.py +++ b/backend/main.py @@ -95,6 +95,9 @@ async def add_environment_to_context(request: Request, call_next): from backend.operator_auth import operator_gate app.middleware("http")(operator_gate) +from backend.routers import operator_auth_routes +app.include_router(operator_auth_routes.router) + # Override TemplateResponse to include environment and version in context original_template_response = templates.TemplateResponse def custom_template_response(name, context=None, *args, **kwargs): diff --git a/backend/routers/operator_auth_routes.py b/backend/routers/operator_auth_routes.py new file mode 100644 index 0000000..eca901c --- /dev/null +++ b/backend/routers/operator_auth_routes.py @@ -0,0 +1,111 @@ +"""Operator login / logout / change-password. These routes intentionally work +regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during +rollout). /login and /logout are on the gate's exempt list; /change-password +requires a session (the gate sets request.state.operator).""" +from fastapi import APIRouter, Request, Depends, Form +from fastapi.responses import RedirectResponse +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.models import OperatorUser +from backend.templates_config import templates +from backend.operator_auth import ( + authenticate, current_operator, change_own_password, make_operator_cookie, + COOKIE_NAME, COOKIE_MAX_AGE, +) +from backend.auth_cookies import COOKIE_SECURE +from backend.auth_passwords import verify_password + +router = APIRouter(tags=["operator-auth"]) + + +def _safe_next(next_url: str) -> str: + """Only allow same-site relative redirects (an open-redirect guard). Rejects + `//host` and `/\\host` — browsers treat a backslash as `/` in the authority + position, so both escape to an external site.""" + if next_url and next_url.startswith("/") and not next_url.startswith(("//", "/\\")): + return next_url + return "/" + + +@router.get("/login") +async def login_page(request: Request, next: str = "", error: str = ""): + return templates.TemplateResponse("login.html", + {"request": request, "next": next, "error": error}) + + +@router.post("/login") +async def login_submit(request: Request, next: str = "", + email: str = Form(...), password: str = Form(...), + db: Session = Depends(get_db)): + user, status = authenticate(db, email, password) + if status == "locked": + return templates.TemplateResponse( + "login.html", + {"request": request, "next": next, + "error": "Too many attempts — try again in 15 minutes."}, + status_code=200) + if user is None: + return templates.TemplateResponse( + "login.html", + {"request": request, "next": next, "error": "Invalid email or password."}, + status_code=200) + dest = "/change-password" if user.must_change_password else _safe_next(next) + resp = RedirectResponse(url=dest, status_code=303) + resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id), + max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) + return resp + + +@router.get("/logout") +async def logout(request: Request): + resp = RedirectResponse(url="/login", status_code=303) + resp.delete_cookie(COOKIE_NAME) + return resp + + +@router.get("/change-password") +async def change_password_page(request: Request, db: Session = Depends(get_db)): + user = getattr(request.state, "operator", None) or current_operator(request, db) + if user is None: + return RedirectResponse(url="/login", status_code=303) + return templates.TemplateResponse( + "change_password.html", + {"request": request, "must_change": user.must_change_password, "error": ""}) + + +@router.post("/change-password") +async def change_password_submit(request: Request, + current_password: str = Form(...), + new_password: str = Form(...), + confirm_password: str = Form(...), + db: Session = Depends(get_db)): + _user_ref = getattr(request.state, "operator", None) or current_operator(request, db) + if _user_ref is None: + return RedirectResponse(url="/login", status_code=303) + # Re-fetch a session-bound copy so mutations via `db` will be committed. + # request.state.operator may be expunged (detached) from the gate's own + # SessionLocal; operating on a detached object against a different session + # would silently drop the UPDATE. + user = db.query(OperatorUser).filter_by(id=_user_ref.id).first() + if user is None: + return RedirectResponse(url="/login", status_code=303) + + def _err(msg): + return templates.TemplateResponse( + "change_password.html", + {"request": request, "must_change": user.must_change_password, "error": msg}, + status_code=200) + + if not verify_password(current_password, user.password_hash): + return _err("Current password is incorrect.") + if len(new_password) < 8: + return _err("New password must be at least 8 characters.") + if new_password != confirm_password: + return _err("New passwords do not match.") + + new_iat = change_own_password(db, user, new_password) + resp = RedirectResponse(url="/", status_code=303) + resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat), + max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) + return resp diff --git a/templates/change_password.html b/templates/change_password.html new file mode 100644 index 0000000..f9d7a5b --- /dev/null +++ b/templates/change_password.html @@ -0,0 +1,39 @@ + + + + + + Change password · Terra-View + + + +
+

Change your password

+ {% if must_change %} +

Please set a new password to continue.

+ {% endif %} + {% if error %} +
{{ error }}
+ {% endif %} +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+ + diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..3d0e9b9 --- /dev/null +++ b/templates/login.html @@ -0,0 +1,32 @@ + + + + + + Sign in · Terra-View + + + +
+

Terra-View

+ {% if error %} +
{{ error }}
+ {% endif %} +
+
+ + +
+
+ + +
+ +
+

Forgot your password? Contact your administrator.

+
+ + diff --git a/tests/test_operator_login.py b/tests/test_operator_login.py new file mode 100644 index 0000000..293314c --- /dev/null +++ b/tests/test_operator_login.py @@ -0,0 +1,98 @@ +# tests/test_operator_login.py +import uuid +from tests.conftest import wire_operator_auth +from backend.operator_auth import ( + create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES, +) + + +def test_login_page_renders(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/login") + assert r.status_code == 200 + assert "password" in r.text.lower() + + +def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"}, + follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/" + assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "") + + +def test_login_honors_next(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"}, + follow_redirects=False) + assert r.headers["location"] == "/settings" + + +def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "ok@x.com", "password": "nope"}, + follow_redirects=False) + assert r.status_code == 200 + assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "") + assert "invalid" in r.text.lower() + + +def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch): + create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change + from backend.models import OperatorUser + user = db_session.query(OperatorUser).filter_by(email="new@x.com").first() + from backend.auth_passwords import hash_password + user.password_hash = hash_password("temp-pw-1") + db_session.commit() + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"}, + follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/change-password" + + +def test_login_lockout_message_after_five(client, db_session, monkeypatch): + create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + for _ in range(MAX_LOGIN_FAILURES): + client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False) + r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False) + assert r.status_code == 200 + assert "too many" in r.text.lower() + + +def test_logout_clears_cookie(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/logout", follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/login" + set_cookie = r.headers.get("set-cookie", "").lower() + assert COOKIE_NAME.lower() in set_cookie + assert 'max-age=0' in set_cookie or 'expires=thu, 01 jan 1970' in set_cookie + + +def test_change_password_self_service(client, db_session, monkeypatch): + user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") + wire_operator_auth(monkeypatch, db_session, enabled=True) + client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) + r = client.post("/change-password", + data={"current_password": "orig-pw-1", "new_password": "brand-new-2", + "confirm_password": "brand-new-2"}, follow_redirects=False) + assert r.status_code == 303 + from backend.auth_passwords import verify_password + db_session.refresh(user) + assert verify_password("brand-new-2", user.password_hash) + assert user.must_change_password is False + + +def test_safe_next_blocks_open_redirect(): + from backend.routers.operator_auth_routes import _safe_next + assert _safe_next("//evil.com") == "/" + assert _safe_next("/\\evil.com") == "/" # backslash authority bypass + assert _safe_next("https://evil.com") == "/" + assert _safe_next("") == "/" + assert _safe_next("/settings") == "/settings"