# Operator Authentication Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add a login + roles to the internal Terra-View app (which today has zero auth), gated behind a `OPERATOR_AUTH_ENABLED` feature flag, with a dead-simple superadmin-driven password-reset story. **Architecture:** One deny-by-default Starlette HTTP middleware gates every route except an explicit allow-list (login/logout/health/static/portal + 3 machine endpoints). It resolves an `OperatorUser` from a signed `tv_session` cookie (re-validated against the DB every request) and stashes it on `request.state.operator`; a `require_role()` dependency reads it for the superadmin-only user-management routes. Reuses the portal's argon2 hasher and HMAC-signed-cookie pattern. The flag governs both the middleware and `require_role`, so shipping with it off behaves exactly like today. **Tech Stack:** FastAPI + Starlette middleware, SQLAlchemy/SQLite, argon2-cffi (already a dep), stdlib `hmac`/`hashlib`, Jinja2, pytest (existing harness). **Spec:** `docs/superpowers/specs/2026-06-17-operator-auth-design.md` --- ## File Structure | File | Responsibility | |---|---| | `backend/auth_cookies.py` *(new)* | generic `sign(payload)` / `read(raw, max_age)` signer + `SECRET_KEY` / `COOKIE_SECURE` | | `backend/models.py` *(modify)* | add `OperatorUser` table + `_utcnow_seconds` helper | | `backend/operator_auth.py` *(new)* | flag, role ranks, cookie helpers, `current_operator`, `authenticate`, lockout, data helpers (create/reset/role/active/change), the gate middleware, `require_role` | | `backend/routers/operator_auth_routes.py` *(new)* | `/login`, `/logout`, `/change-password` | | `backend/routers/operator_users.py` *(new)* | `/admin/users` page + CRUD JSON (superadmin) | | `backend/operator_admin.py` *(new)* | seed / break-glass CLI | | `backend/main.py` *(modify)* | register the gate middleware + the two routers | | `templates/login.html`, `templates/change_password.html` *(new)* | standalone (no nav) auth pages | | `templates/admin/users.html` *(new)* | user-management page (extends base.html) | | `tests/conftest.py` *(modify)* | add `wire_operator_auth()` helper | | `tests/test_operator_*.py` *(new)* | per-task test files | **Conventions discovered (follow these exactly):** - UUID PKs: `id = Column(String, primary_key=True, index=True)` set via `str(uuid.uuid4())`. - argon2 helpers live in `backend/auth_passwords.py`: `hash_password(raw)`, `verify_password(raw, hashed)` (never raises), `generate_password(n_bytes=12)`. - Tests run in the dev container: `docker exec terra-view-terra-view-1 python -m pytest tests/ -v`. Source is bind-mounted, so no rebuild between edits. - The gate middleware reads the DB through a module-level `SessionLocal` (same pattern the portal WS handler uses) so tests can monkeypatch it. `db_session.get_bind()` returns the test engine — bind a `sessionmaker` to it. - The flag and `SessionLocal` are read as **module globals at call time**, so `monkeypatch.setattr(backend.operator_auth, "...", ...)` works without re-importing. --- ## Task 1: Generic signed-cookie module (`auth_cookies.py`) Lift the portal's HMAC signer into a shared, payload-agnostic module the operator auth uses now (the portal keeps its own helpers untouched). **Files:** - Create: `backend/auth_cookies.py` - Test: `tests/test_operator_cookies.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_cookies.py import time from backend.auth_cookies import sign, read def test_sign_then_read_round_trips(): raw = sign({"uid": "abc", "iat": 1000}) data = read(raw, max_age=3600) assert data == {"uid": "abc", "iat": 1000} def test_tampered_signature_is_rejected(): raw = sign({"uid": "abc", "iat": int(time.time())}) body, _sig = raw.rsplit(".", 1) assert read(body + ".deadbeef", max_age=3600) is None def test_tampered_body_is_rejected(): raw = sign({"uid": "abc", "iat": int(time.time())}) body, sig = raw.rsplit(".", 1) import base64, json forged = base64.urlsafe_b64encode(json.dumps({"uid": "evil", "iat": int(time.time())}).encode()).decode() assert read(forged + "." + sig, max_age=3600) is None def test_expired_by_iat_is_rejected(): raw = sign({"uid": "abc", "iat": int(time.time()) - 10_000}) assert read(raw, max_age=3600) is None def test_garbage_input_is_none_not_raise(): assert read("not-a-cookie", max_age=3600) is None assert read("", max_age=3600) is None assert read(None, max_age=3600) is None ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v` Expected: FAIL — `ModuleNotFoundError: No module named 'backend.auth_cookies'` - [ ] **Step 3: Write the implementation** ```python # backend/auth_cookies.py """Generic HMAC-signed cookie payloads, shared by operator auth (and, optionally later, the portal). A signed value is f"{b64url(json)}.{hmac_sha256(b64)}"; read() verifies the signature in constant time and enforces a server-side iat expiry. The signing secret is the same SECRET_KEY the portal already reads, so a single env var protects both cookies. Never store or log raw secrets.""" import os import hmac import json import time import base64 import hashlib import logging logger = logging.getLogger(__name__) # Same env var the portal cookie uses — one secret protects both. The insecure # default only exists so dev/test boots without config; set a real SECRET_KEY in prod. SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me") # Set COOKIE_SECURE=true once served over HTTPS; leave false on plain HTTP or the # browser won't send the cookie. COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false").lower() in ("1", "true", "yes") def _sign(body: str) -> str: return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest() def sign(payload: dict) -> str: """Serialize + sign a payload dict into a cookie-safe string.""" body = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode() return f"{body}.{_sign(body)}" def read(raw, max_age: int): """Verify a signed value and return its payload dict, or None if missing, tampered, or older than max_age seconds (by its own `iat`).""" if not raw or not isinstance(raw, str): return None try: body, sig = raw.rsplit(".", 1) except (ValueError, AttributeError): return None if not hmac.compare_digest(sig, _sign(body)): return None try: data = json.loads(base64.urlsafe_b64decode(body.encode())) except Exception: return None if not isinstance(data, dict): return None iat = data.get("iat") if not isinstance(iat, (int, float)) or (time.time() - iat) > max_age: return None return data ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v` Expected: PASS (5 passed) - [ ] **Step 5: Commit** ```bash git add backend/auth_cookies.py tests/test_operator_cookies.py git commit -m "feat(auth): generic HMAC signed-cookie module for operator auth" ``` --- ## Task 2: `OperatorUser` model + role-rank helpers The new table (auto-created by `Base.metadata.create_all`, no migration) and the role ladder. **Files:** - Modify: `backend/models.py` (add `_utcnow_seconds` near the top imports, and `OperatorUser` at end of file) - Create: `backend/operator_auth.py` (start it with constants + `role_at_least`) - Test: `tests/test_operator_model.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_model.py import uuid from backend.models import OperatorUser from backend.operator_auth import role_at_least, _ROLE_RANK def test_operator_user_defaults(db_session): u = OperatorUser(id=str(uuid.uuid4()), email="a@x.com", display_name="A", password_hash="h", role="admin") db_session.add(u) db_session.commit() got = db_session.query(OperatorUser).filter_by(email="a@x.com").first() assert got.active is True assert got.must_change_password is False assert got.failed_login_count == 0 assert got.locked_until is None assert got.sessions_valid_from is not None assert got.sessions_valid_from.microsecond == 0 # truncated to whole seconds def test_email_is_unique(db_session): for i in range(2): db_session.add(OperatorUser(id=str(uuid.uuid4()), email="dup@x.com", display_name="d", password_hash="h", role="admin")) import pytest with pytest.raises(Exception): db_session.commit() def test_role_ladder(): assert _ROLE_RANK == {"operator": 10, "admin": 20, "superadmin": 30} assert role_at_least("superadmin", "admin") is True assert role_at_least("admin", "admin") is True assert role_at_least("admin", "superadmin") is False assert role_at_least("operator", "admin") is False assert role_at_least("nonsense", "admin") is False ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v` Expected: FAIL — `ImportError: cannot import name 'OperatorUser'` - [ ] **Step 3a: Add the `_utcnow_seconds` helper to `backend/models.py`** Insert directly after the existing `from datetime import datetime` line (line 2) and before `from backend.database import Base`: ```python def _utcnow_seconds(): """utcnow truncated to whole seconds — used as the default for sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second epoch) never falls a few microseconds before it and self-invalidates.""" return datetime.utcnow().replace(microsecond=0) ``` - [ ] **Step 3b: Append the `OperatorUser` model at the end of `backend/models.py`** ```python class OperatorUser(Base): """An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and 'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table → create_all builds it, no migration. Never store or log raw passwords.""" __tablename__ = "operator_users" id = Column(String, primary_key=True, index=True) # UUID email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased display_name = Column(String, nullable=False) # "Brian", "Dad" password_hash = Column(String, nullable=False) # argon2id role = Column(String, nullable=False, default="admin") # superadmin | admin active = Column(Boolean, default=True) # False = login disabled must_change_password = Column(Boolean, default=False) # forces a change next login sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere failed_login_count = Column(Integer, default=0) # lockout counter locked_until = Column(DateTime, nullable=True) # set after too many bad tries created_at = Column(DateTime, default=datetime.utcnow) last_login_at = Column(DateTime, nullable=True) ``` - [ ] **Step 3c: Create `backend/operator_auth.py` with constants + `role_at_least`** ```python # backend/operator_auth.py """Operator authentication: the deny-by-default gate, session cookie, login + lockout, and the small data helpers shared by the routes and the CLI. Reuses the argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies). The flag and SessionLocal are read as module globals at call time so tests can monkeypatch them.""" import os import time import uuid from datetime import datetime, timedelta from backend.models import OperatorUser from backend.auth_passwords import hash_password, verify_password, generate_password # Feature flag — OFF by default. When off, the gate and require_role both pass # everything through and the app behaves exactly as it does today. OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes") COOKIE_NAME = "tv_session" COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device") MAX_LOGIN_FAILURES = 5 LOCK_MINUTES = 15 # Role ladder — a rank map so checks read naturally and 'operator' slots in later. _ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30} def role_at_least(role: str, minimum: str) -> bool: """True iff `role` ranks at or above `minimum`. Unknown roles rank as 0.""" return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum] def _norm_email(email: str) -> str: return (email or "").strip().lower() ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v` Expected: PASS (3 passed) - [ ] **Step 5: Commit** ```bash git add backend/models.py backend/operator_auth.py tests/test_operator_model.py git commit -m "feat(auth): OperatorUser model + role ladder" ``` --- ## Task 3: Session cookie helpers + `current_operator` Mint/validate the `tv_session` cookie and resolve the operator from a request, re-validating against the DB every call. **Files:** - Modify: `backend/operator_auth.py` (append helpers) - Test: `tests/test_operator_session.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_session.py import time import uuid from datetime import datetime, timedelta from types import SimpleNamespace from backend.models import OperatorUser from backend.operator_auth import ( make_operator_cookie, current_operator, COOKIE_NAME, ) def _make_user(db, **kw): u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"), display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw) db.add(u) db.commit() return u def _req(cookie_value): # current_operator only reads request.cookies — a stub is enough. return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {}) def test_valid_cookie_resolves_user(db_session): u = _make_user(db_session) cookie = make_operator_cookie(u.id) assert current_operator(_req(cookie), db_session).id == u.id def test_no_or_garbage_cookie_is_none(db_session): assert current_operator(_req(None), db_session) is None assert current_operator(_req("garbage"), db_session) is None def test_inactive_user_is_none(db_session): u = _make_user(db_session, active=False) assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None def test_locked_user_is_none(db_session): u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5)) assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None def test_cookie_older_than_sessions_valid_from_is_none(db_session): u = _make_user(db_session) old_iat = int(time.time()) - 1000 cookie = make_operator_cookie(u.id, iat=old_iat) u.sessions_valid_from = datetime.utcnow() db_session.commit() assert current_operator(_req(cookie), db_session) is None def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session): # Guards the change-password race: bump sessions_valid_from to the new cookie's # exact iat → that fresh cookie must remain valid. u = _make_user(db_session) new_iat = int(time.time()) u.sessions_valid_from = datetime.utcfromtimestamp(new_iat) db_session.commit() assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v` Expected: FAIL — `ImportError: cannot import name 'make_operator_cookie'` - [ ] **Step 3: Append the helpers to `backend/operator_auth.py`** ```python from backend.auth_cookies import sign, read, COOKIE_SECURE # add to the import block at top def make_operator_cookie(uid: str, iat: int = None) -> str: """Sign a tv_session value for a user id. iat defaults to now; pass an explicit iat when you bump sessions_valid_from to that same instant (change-password).""" return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())}) def current_operator(request, db): """Resolve the OperatorUser for a request's tv_session cookie, or None. Re-validated against the DB every call: a disabled / locked / password-changed user drops on the next request. Used by the gate middleware (with its own session) — does not raise.""" data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE) if not data: return None uid, iat = data.get("uid"), data.get("iat") if not uid or not isinstance(iat, (int, float)): return None user = db.query(OperatorUser).filter_by(id=uid).first() if not user or not user.active: return None if user.locked_until and user.locked_until > datetime.utcnow(): return None if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from: return None return user ``` NOTE: move the `from backend.auth_cookies import sign, read, COOKIE_SECURE` line up into the import block at the top of the file (shown inline here only to mark the dependency). `COOKIE_SECURE` is imported now because later tasks (login route) set the cookie with it. - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v` Expected: PASS (6 passed) - [ ] **Step 5: Commit** ```bash git add backend/operator_auth.py tests/test_operator_session.py git commit -m "feat(auth): operator session cookie + current_operator DB re-validation" ``` --- ## Task 4: `authenticate` + lockout + data helpers Login verification with brute-force lockout, plus the create/reset/role/active/change helpers shared by the routes and the CLI. **Files:** - Modify: `backend/operator_auth.py` (append) - Test: `tests/test_operator_authenticate.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_authenticate.py import time from datetime import datetime import pytest from backend.operator_auth import ( authenticate, create_operator, reset_operator_password, set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES, ) from backend.auth_passwords import verify_password from backend.models import OperatorUser def test_create_operator_generates_temp_and_forces_change(db_session): user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin") assert user.email == "dad@x.com" # lowercased assert user.must_change_password is True assert verify_password(raw, user.password_hash) def test_create_operator_with_explicit_password_no_forced_change(db_session): user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123") assert raw == "chosen-pw-123" assert user.must_change_password is False def test_create_operator_rejects_duplicate_and_bad_role(db_session): create_operator(db_session, "a@x.com", "A", "admin") with pytest.raises(ValueError): create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive) with pytest.raises(ValueError): create_operator(db_session, "b@x.com", "B", "wizard") # bad role def test_authenticate_success(db_session): user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") got, status = authenticate(db_session, "OK@x.com", "rightpw-9") assert status == "ok" and got.id == user.id assert got.last_login_at is not None assert got.failed_login_count == 0 def test_authenticate_wrong_password_counts(db_session): create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9") got, status = authenticate(db_session, "wp@x.com", "nope") assert got is None and status == "bad" assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1 def test_lockout_after_five_then_correct_password_refused(db_session): create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") for _ in range(MAX_LOGIN_FAILURES): authenticate(db_session, "lk@x.com", "nope") got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked assert got is None and status == "locked" def test_authenticate_unknown_email_is_bad_not_error(db_session): got, status = authenticate(db_session, "ghost@x.com", "whatever") assert got is None and status == "bad" def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session): user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1") before = user.sessions_valid_from raw = reset_operator_password(db_session, user) assert verify_password(raw, user.password_hash) assert user.must_change_password is True assert user.sessions_valid_from >= before def test_change_own_password_clears_flag_and_bumps(db_session): user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") user.must_change_password = True db_session.commit() new_iat = change_own_password(db_session, user, "brand-new-pw-2") assert verify_password("brand-new-pw-2", user.password_hash) assert user.must_change_password is False assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat) def test_set_active_and_role(db_session): user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1") set_operator_active(db_session, user, False) assert user.active is False set_operator_role(db_session, user, "superadmin") assert user.role == "superadmin" with pytest.raises(ValueError): set_operator_role(db_session, user, "wizard") ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v` Expected: FAIL — `ImportError: cannot import name 'authenticate'` - [ ] **Step 3: Append the helpers to `backend/operator_auth.py`** ```python def register_login_failure(db, user) -> None: """Increment a user's failure counter and lock them out past the threshold.""" user.failed_login_count = (user.failed_login_count or 0) + 1 if user.failed_login_count >= MAX_LOGIN_FAILURES: user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES) db.commit() def authenticate(db, email, password): """Return (user, "ok") on success, (None, "locked") if locked out, else (None, "bad"). Never reveals whether the email exists (generic 'bad').""" user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first() if user and user.locked_until and user.locked_until > datetime.utcnow(): return None, "locked" if not user or not user.active or not verify_password(password, user.password_hash): if user: register_login_failure(db, user) return None, "bad" user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.utcnow() db.commit() return user, "ok" def create_operator(db, email, name, role, password=None, must_change=None): """Create an operator. With no password, generate a temp one and force a change (must_change defaults True). With a password, must_change defaults False. Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role.""" email = _norm_email(email) if role not in _ROLE_RANK: raise ValueError(f"unknown role {role!r}") if db.query(OperatorUser).filter_by(email=email).first(): raise ValueError(f"operator {email} already exists") if password is None: password = generate_password() if must_change is None: must_change = True elif must_change is None: must_change = False user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name, password_hash=hash_password(password), role=role, active=True, must_change_password=must_change) db.add(user) db.commit() return user, password def reset_operator_password(db, user) -> str: """Generate a fresh temp password, force a change, log the user out everywhere. Returns the raw password to show once.""" raw = generate_password() user.password_hash = hash_password(raw) user.must_change_password = True user.failed_login_count = 0 user.locked_until = None user.sessions_valid_from = datetime.utcnow().replace(microsecond=0) db.commit() return raw def change_own_password(db, user, new_password) -> int: """Set a user's own new password, clear the forced-change flag, and bump sessions_valid_from to the returned iat — the caller mints the replacement cookie with that exact iat so it stays valid while older cookies die.""" new_iat = int(time.time()) user.password_hash = hash_password(new_password) user.must_change_password = False user.sessions_valid_from = datetime.utcfromtimestamp(new_iat) db.commit() return new_iat def set_operator_active(db, user, active: bool): user.active = bool(active) db.commit() return user def set_operator_role(db, user, role: str): if role not in _ROLE_RANK: raise ValueError(f"unknown role {role!r}") user.role = role db.commit() return user ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v` Expected: PASS (10 passed) - [ ] **Step 5: Commit** ```bash git add backend/operator_auth.py tests/test_operator_authenticate.py git commit -m "feat(auth): authenticate + lockout + operator data helpers" ``` --- ## Task 5: Gate middleware + `require_role` + wire into the app The deny-by-default Starlette middleware and the role dependency. Registering the middleware with the flag defaulting OFF keeps every existing test green. **Files:** - Modify: `backend/operator_auth.py` (append `operator_gate` + `require_role`) - Modify: `backend/main.py` (register the middleware) - Modify: `tests/conftest.py` (add `wire_operator_auth` helper) - Test: `tests/test_operator_gate.py` - [ ] **Step 1: Add the `wire_operator_auth` helper to `tests/conftest.py`** Append at the end of `tests/conftest.py`: ```python def wire_operator_auth(monkeypatch, db_session, enabled=True): """Point the gate middleware's SessionLocal at the test engine and flip the flag. The middleware opens its OWN session (it can't use the get_db override), so it must read the same engine the test writes to.""" import backend.operator_auth as oa from sqlalchemy.orm import sessionmaker maker = sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False) monkeypatch.setattr(oa, "SessionLocal", maker, raising=False) monkeypatch.setattr(oa, "OPERATOR_AUTH_ENABLED", enabled, raising=False) return oa ``` - [ ] **Step 2: Write the failing test** ```python # tests/test_operator_gate.py import uuid from tests.conftest import wire_operator_auth from backend.models import OperatorUser from backend.operator_auth import make_operator_cookie, COOKIE_NAME from backend.auth_passwords import hash_password def _make_user(db, role="admin", **kw): u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"), display_name="U", password_hash=hash_password("pw"), role=role, **kw) db.add(u) db.commit() return u def test_flag_off_passes_everything(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=False) assert client.get("/", follow_redirects=False).status_code == 200 def test_gated_html_redirects_to_login_when_unauth(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.get("/", follow_redirects=False) assert r.status_code == 303 assert r.headers["location"].startswith("/login?next=") def test_gated_api_returns_401_json_when_unauth(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.get("/api/status-snapshot", follow_redirects=False) assert r.status_code == 401 def test_valid_session_passes(client, db_session, monkeypatch): u = _make_user(db_session) wire_operator_auth(monkeypatch, db_session, enabled=True) client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id)) assert client.get("/", follow_redirects=False).status_code == 200 def test_must_change_password_user_routed_to_change_password(client, db_session, monkeypatch): u = _make_user(db_session, must_change_password=True) wire_operator_auth(monkeypatch, db_session, enabled=True) client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id)) r = client.get("/", follow_redirects=False) assert r.status_code == 303 assert r.headers["location"] == "/change-password" def test_exempt_paths_pass_without_cookie(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) assert client.get("/health", follow_redirects=False).status_code == 200 assert client.get("/login", follow_redirects=False).status_code == 200 def test_portal_paths_are_exempt(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) # /portal/p/ hits the portal's own gate (403/404), never the operator login. r = client.get("/portal/p/nope", follow_redirects=False) assert r.status_code in (403, 404) ``` - [ ] **Step 3: Append `operator_gate` + `require_role` to `backend/operator_auth.py`** Add these imports to the top of the file: ```python from urllib.parse import quote from fastapi import Request, HTTPException from fastapi.responses import JSONResponse, RedirectResponse from backend.database import SessionLocal ``` Then append: ```python # Routes reachable with no login. A new route added next year is gated by default. _EXEMPT_EXACT = { "/login", "/logout", "/health", "/manifest.json", "/sw.js", "/favicon.ico", "/offline-db.js", "/portal", # portal home (its own auth) # machine endpoints — LAN-only, automated, no human (watchers/heartbeats): "/emitters/report", "/api/series3/heartbeat", "/api/series4/heartbeat", } _EXEMPT_PREFIX = ("/static/", "/portal/") def _is_exempt(path: str) -> bool: return path in _EXEMPT_EXACT or path.startswith(_EXEMPT_PREFIX) async def operator_gate(request: Request, call_next): """Deny-by-default gate. Flag off → pass through (app as today). Flag on → exempt paths pass; otherwise require a valid operator session, stash it on request.state.operator, and force a password change when pending.""" if not OPERATOR_AUTH_ENABLED: return await call_next(request) path = request.url.path if _is_exempt(path): return await call_next(request) db = SessionLocal() try: user = current_operator(request, db) if user is not None: db.expunge(user) # detach a fully-loaded row so we can close now finally: db.close() if user is None: if path.startswith("/api/"): return JSONResponse({"detail": "Not authenticated"}, status_code=401) return RedirectResponse(f"/login?next={quote(path)}", status_code=303) if user.must_change_password and path not in ("/change-password", "/logout"): return RedirectResponse("/change-password", status_code=303) request.state.operator = user return await call_next(request) def require_role(minimum: str): """Dependency factory: require a logged-in operator ranked >= `minimum`. Respects the flag (off → pass through). When on, the middleware has already set request.state.operator before this runs.""" def _dep(request: Request): if not OPERATOR_AUTH_ENABLED: return None user = getattr(request.state, "operator", None) if user is None: raise HTTPException(status_code=401, detail="Not authenticated") if not role_at_least(user.role, minimum): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return _dep ``` - [ ] **Step 4: Register the middleware in `backend/main.py`** After the existing `add_environment_to_context` middleware block (ends line 90), add: ```python # Operator auth — deny-by-default gate over the whole internal app. Governed by # OPERATOR_AUTH_ENABLED (default off → behaves exactly as today). See # docs/superpowers/specs/2026-06-17-operator-auth-design.md. from backend.operator_auth import operator_gate app.middleware("http")(operator_gate) ``` - [ ] **Step 5: Run tests to verify the gate works AND nothing regressed** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_gate.py tests/test_portal_gate.py -v` Expected: PASS (gate tests pass; the 6 existing portal-gate tests still pass — flag defaults off) - [ ] **Step 6: Commit** ```bash git add backend/operator_auth.py backend/main.py tests/conftest.py tests/test_operator_gate.py git commit -m "feat(auth): deny-by-default gate middleware + require_role" ``` --- ## Task 6: Login / logout / change-password routes + templates The auth pages. These routes work regardless of the flag (you log in while the flag is still off during rollout) and are on the gate's exempt list (login/logout) — except `/change-password`, which requires a session. **Files:** - Create: `backend/routers/operator_auth_routes.py` - Create: `templates/login.html`, `templates/change_password.html` - Modify: `backend/main.py` (include the router) - Test: `tests/test_operator_login.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_login.py import uuid from tests.conftest import wire_operator_auth from backend.operator_auth import ( create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES, ) def test_login_page_renders(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.get("/login") assert r.status_code == 200 assert "password" in r.text.lower() def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch): create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"}, follow_redirects=False) assert r.status_code == 303 assert r.headers["location"] == "/" assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "") def test_login_honors_next(client, db_session, monkeypatch): create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"}, follow_redirects=False) assert r.headers["location"] == "/settings" def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch): create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.post("/login", data={"email": "ok@x.com", "password": "nope"}, follow_redirects=False) assert r.status_code == 200 assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "") assert "invalid" in r.text.lower() def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch): create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change # fetch the raw temp by resetting to a known one user = None from backend.models import OperatorUser user = db_session.query(OperatorUser).filter_by(email="new@x.com").first() from backend.auth_passwords import hash_password user.password_hash = hash_password("temp-pw-1") db_session.commit() wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"}, follow_redirects=False) assert r.status_code == 303 assert r.headers["location"] == "/change-password" def test_login_lockout_message_after_five(client, db_session, monkeypatch): create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") wire_operator_auth(monkeypatch, db_session, enabled=True) for _ in range(MAX_LOGIN_FAILURES): client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False) r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False) assert r.status_code == 200 assert "too many" in r.text.lower() def test_logout_clears_cookie(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.get("/logout", follow_redirects=False) assert r.status_code == 303 assert r.headers["location"] == "/login" # cookie cleared (deletion appears as an expired/empty set-cookie) assert COOKIE_NAME in r.headers.get("set-cookie", "") def test_change_password_self_service(client, db_session, monkeypatch): user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") wire_operator_auth(monkeypatch, db_session, enabled=True) client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) r = client.post("/change-password", data={"current_password": "orig-pw-1", "new_password": "brand-new-2", "confirm_password": "brand-new-2"}, follow_redirects=False) assert r.status_code == 303 from backend.auth_passwords import verify_password db_session.refresh(user) assert verify_password("brand-new-2", user.password_hash) assert user.must_change_password is False ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v` Expected: FAIL — `/login` returns 404 / 303 (route doesn't exist yet) - [ ] **Step 3a: Create `templates/login.html`** ```html Sign in · Terra-View

Terra-View

{% if error %}
{{ error }}
{% endif %}

Forgot your password? Contact your administrator.

``` - [ ] **Step 3b: Create `templates/change_password.html`** ```html Change password · Terra-View

Change your password

{% if must_change %}

Please set a new password to continue.

{% endif %} {% if error %}
{{ error }}
{% endif %}
``` - [ ] **Step 3c: Create `backend/routers/operator_auth_routes.py`** ```python """Operator login / logout / change-password. These routes intentionally work regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during rollout). /login and /logout are on the gate's exempt list; /change-password requires a session (the gate sets request.state.operator).""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import RedirectResponse from sqlalchemy.orm import Session from backend.database import get_db from backend.templates_config import templates from backend.operator_auth import ( authenticate, current_operator, change_own_password, make_operator_cookie, COOKIE_NAME, COOKIE_MAX_AGE, ) from backend.auth_cookies import COOKIE_SECURE from backend.auth_passwords import verify_password router = APIRouter(tags=["operator-auth"]) def _safe_next(next_url: str) -> str: """Only allow same-site relative redirects (an open-redirect guard).""" if next_url and next_url.startswith("/") and not next_url.startswith("//"): return next_url return "/" @router.get("/login") async def login_page(request: Request, next: str = "", error: str = ""): return templates.TemplateResponse("login.html", {"request": request, "next": next, "error": error}) @router.post("/login") async def login_submit(request: Request, next: str = "", email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): user, status = authenticate(db, email, password) if status == "locked": return templates.TemplateResponse( "login.html", {"request": request, "next": next, "error": "Too many attempts — try again in 15 minutes."}, status_code=200) if user is None: return templates.TemplateResponse( "login.html", {"request": request, "next": next, "error": "Invalid email or password."}, status_code=200) dest = "/change-password" if user.must_change_password else _safe_next(next) resp = RedirectResponse(url=dest, status_code=303) resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id), max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) return resp @router.get("/logout") async def logout(request: Request): resp = RedirectResponse(url="/login", status_code=303) resp.delete_cookie(COOKIE_NAME) return resp @router.get("/change-password") async def change_password_page(request: Request, db: Session = Depends(get_db)): user = getattr(request.state, "operator", None) or current_operator(request, db) if user is None: return RedirectResponse(url="/login", status_code=303) return templates.TemplateResponse( "change_password.html", {"request": request, "must_change": user.must_change_password, "error": ""}) @router.post("/change-password") async def change_password_submit(request: Request, current_password: str = Form(...), new_password: str = Form(...), confirm_password: str = Form(...), db: Session = Depends(get_db)): user = getattr(request.state, "operator", None) or current_operator(request, db) if user is None: return RedirectResponse(url="/login", status_code=303) def _err(msg): return templates.TemplateResponse( "change_password.html", {"request": request, "must_change": user.must_change_password, "error": msg}, status_code=200) if not verify_password(current_password, user.password_hash): return _err("Current password is incorrect.") if len(new_password) < 8: return _err("New password must be at least 8 characters.") if new_password != confirm_password: return _err("New passwords do not match.") new_iat = change_own_password(db, user, new_password) resp = RedirectResponse(url="/", status_code=303) resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat), max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) return resp ``` - [ ] **Step 3d: Register the router in `backend/main.py`** After the middleware registration added in Task 5, add: ```python from backend.routers import operator_auth_routes app.include_router(operator_auth_routes.router) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v` Expected: PASS (8 passed) - [ ] **Step 5: Commit** ```bash git add backend/routers/operator_auth_routes.py templates/login.html templates/change_password.html backend/main.py tests/test_operator_login.py git commit -m "feat(auth): login/logout/change-password routes + pages" ``` --- ## Task 7: User-management routes + template (superadmin-only) `/admin/users` page and JSON CRUD, all behind `require_role("superadmin")`. Temp passwords are returned once. **Files:** - Create: `backend/routers/operator_users.py` - Create: `templates/admin/users.html` - Modify: `backend/main.py` (include the router) - Test: `tests/test_operator_users.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_users.py import uuid from tests.conftest import wire_operator_auth from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME from backend.models import OperatorUser def _login_as(client, user): client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) def test_admin_cannot_reach_user_management(client, db_session, monkeypatch): admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, admin) assert client.get("/admin/users", follow_redirects=False).status_code == 403 def test_superadmin_sees_user_management(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) assert client.get("/admin/users", follow_redirects=False).status_code == 200 def test_superadmin_lists_users_json(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) r = client.get("/api/admin/users") assert r.status_code == 200 emails = [u["email"] for u in r.json()["users"]] assert "su@x.com" in emails assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes def test_create_user_returns_temp_once(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) r = client.post("/api/admin/users", json={"email": "dad@x.com", "name": "Dad", "role": "admin"}) assert r.status_code == 200 assert len(r.json()["password"]) >= 12 made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first() assert made.must_change_password is True def test_reset_password_returns_temp_once(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) r = client.post(f"/api/admin/users/{target.id}/reset-password") assert r.status_code == 200 and len(r.json()["password"]) >= 12 db_session.refresh(target) assert target.must_change_password is True def test_disable_and_enable(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200 db_session.refresh(target); assert target.active is False assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200 db_session.refresh(target); assert target.active is True def test_change_role(client, db_session, monkeypatch): su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") wire_operator_auth(monkeypatch, db_session, enabled=True) _login_as(client, su) r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}) assert r.status_code == 200 db_session.refresh(target); assert target.role == "superadmin" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v` Expected: FAIL — routes don't exist (404) - [ ] **Step 3a: Create `templates/admin/users.html`** ```html {% extends "base.html" %} {% block title %}Operator Accounts{% endblock %} {% block content %}

Operator Accounts

NameEmailRoleStatusLast login
{% endblock %} ``` - [ ] **Step 3b: Create `backend/routers/operator_users.py`** ```python """Operator account management — superadmin only. Temp passwords are returned in the JSON response once (shown to the superadmin to hand off); only hashes persist.""" from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy.orm import Session from backend.database import get_db from backend.templates_config import templates from backend.models import OperatorUser from backend.operator_auth import ( require_role, create_operator, reset_operator_password, set_operator_active, set_operator_role, ) from backend.utils.timezone import format_local_datetime router = APIRouter(tags=["operator-users"]) _superadmin = require_role("superadmin") class NewUser(BaseModel): email: str name: str role: str = "admin" class RoleChange(BaseModel): role: str def _serialize(u: OperatorUser) -> dict: from datetime import datetime return { "id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role, "active": bool(u.active), "must_change_password": bool(u.must_change_password), "locked": bool(u.locked_until and u.locked_until > datetime.utcnow()), "last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None, } @router.get("/admin/users") async def users_page(request: Request, _=Depends(_superadmin)): return templates.TemplateResponse("admin/users.html", {"request": request}) @router.get("/api/admin/users") async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)): users = db.query(OperatorUser).order_by(OperatorUser.display_name).all() return {"users": [_serialize(u) for u in users]} @router.post("/api/admin/users") async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)): try: user, raw = create_operator(db, body.email, body.name, body.role) except ValueError as e: return JSONResponse(status_code=400, content={"detail": str(e)}) return {"user": _serialize(user), "password": raw} @router.post("/api/admin/users/{user_id}/reset-password") async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): user = db.query(OperatorUser).filter_by(id=user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") raw = reset_operator_password(db, user) return {"password": raw} @router.post("/api/admin/users/{user_id}/disable") async def disable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): user = db.query(OperatorUser).filter_by(id=user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") set_operator_active(db, user, False) return {"active": False} @router.post("/api/admin/users/{user_id}/enable") async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): user = db.query(OperatorUser).filter_by(id=user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") set_operator_active(db, user, True) return {"active": True} @router.post("/api/admin/users/{user_id}/role") async def change_user_role(user_id: str, body: RoleChange, _=Depends(_superadmin), db: Session = Depends(get_db)): user = db.query(OperatorUser).filter_by(id=user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") try: set_operator_role(db, user, body.role) except ValueError as e: return JSONResponse(status_code=400, content={"detail": str(e)}) return {"role": user.role} ``` - [ ] **Step 3c: Register the router in `backend/main.py`** After the `operator_auth_routes` include added in Task 6, add: ```python from backend.routers import operator_users app.include_router(operator_users.router) ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v` Expected: PASS (7 passed) - [ ] **Step 5: Commit** ```bash git add backend/routers/operator_users.py templates/admin/users.html backend/main.py tests/test_operator_users.py git commit -m "feat(auth): superadmin user-management page + CRUD" ``` --- ## Task 8: Seed / break-glass CLI (`operator_admin.py`) The bootstrap (first superadmin, before any UI is reachable) and the break-glass (locked out / forgot everything). Thin wrappers over the Task-4 data helpers. **Files:** - Create: `backend/operator_admin.py` - Test: `tests/test_operator_admin_cli.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_admin_cli.py from sqlalchemy.orm import sessionmaker from backend.models import OperatorUser from backend.auth_passwords import verify_password import backend.operator_admin as cli def _maker(db_session): return sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False) def test_seed_superadmin(db_session, monkeypatch): monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) cli.cmd_create_superadmin(email="brian@x.com", name="Brian", password="chosen-pw-1") u = db_session.query(OperatorUser).filter_by(email="brian@x.com").first() assert u.role == "superadmin" assert u.must_change_password is False assert verify_password("chosen-pw-1", u.password_hash) def test_create_user_generates_temp(db_session, monkeypatch, capsys): monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) cli.cmd_create_user(email="dad@x.com", name="Dad", role="admin") u = db_session.query(OperatorUser).filter_by(email="dad@x.com").first() assert u.role == "admin" and u.must_change_password is True assert "dad@x.com" in capsys.readouterr().out # prints the temp once def test_reset_password_cli(db_session, monkeypatch): monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) cli.cmd_create_user(email="r@x.com", name="R", role="admin") before = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash cli.cmd_reset_password(email="r@x.com") after = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash assert before != after def test_disable_enable_cli(db_session, monkeypatch): monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) cli.cmd_create_user(email="d@x.com", name="D", role="admin") cli.cmd_set_active(email="d@x.com", active=False) assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is False cli.cmd_set_active(email="d@x.com", active=True) assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is True ``` - [ ] **Step 2: Run tests to verify they fail** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v` Expected: FAIL — `ModuleNotFoundError: No module named 'backend.operator_admin'` - [ ] **Step 3: Create `backend/operator_admin.py`** ```python #!/usr/bin/env python3 """Operator-account admin CLI — the bootstrap and the break-glass. Run inside the terra-view container against the live DB. Temp/raw passwords are printed ONCE; only hashes persist. # first superadmin (before any UI is reachable) — prompts for a password, or --generate python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian" # a parent's account — generates a temp password, must-change on first login python3 backend/operator_admin.py create-user --email dad@x.com --name "Dad" --role admin python3 backend/operator_admin.py reset-password --email dad@x.com python3 backend/operator_admin.py list python3 backend/operator_admin.py disable --email dad@x.com python3 backend/operator_admin.py enable --email dad@x.com """ import os import sys import getpass import argparse from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from backend.database import SessionLocal from backend.models import OperatorUser from backend.operator_auth import ( create_operator, reset_operator_password, set_operator_active, _norm_email, ) def _get(db, email): u = db.query(OperatorUser).filter_by(email=_norm_email(email)).first() if not u: sys.exit(f"No operator with email '{email}'.") return u def cmd_create_superadmin(email, name, password=None, generate=False): db = SessionLocal() try: if password is None and not generate: password = getpass.getpass("Password for new superadmin: ") if not password or len(password) < 8: sys.exit("Password must be at least 8 characters.") user, raw = create_operator(db, email, name, "superadmin", password=None if generate else password) if generate: print(f"✓ Superadmin {user.email} created. Temp password (shown once): {raw}") else: print(f"✓ Superadmin {user.email} created.") except ValueError as e: sys.exit(str(e)) finally: db.close() def cmd_create_user(email, name, role="admin"): db = SessionLocal() try: user, raw = create_operator(db, email, name, role) print(f"✓ {role} {user.email} created. Temp password (shown once): {raw}") print(" They'll be required to change it on first login.") except ValueError as e: sys.exit(str(e)) finally: db.close() def cmd_reset_password(email): db = SessionLocal() try: user = _get(db, email) raw = reset_operator_password(db, user) print(f"✓ Reset {user.email}. Temp password (shown once): {raw}") finally: db.close() def cmd_set_active(email, active): db = SessionLocal() try: user = _get(db, email) set_operator_active(db, user, active) print(f"✓ {user.email} {'enabled' if active else 'disabled'}.") finally: db.close() def cmd_list(): db = SessionLocal() try: users = db.query(OperatorUser).order_by(OperatorUser.display_name).all() if not users: print("No operators yet. Run create-superadmin first.") return for u in users: locked = " [LOCKED]" if (u.locked_until and u.locked_until > datetime.utcnow()) else "" state = "active" if u.active else "DISABLED" last = u.last_login_at.strftime("%Y-%m-%d %H:%M") if u.last_login_at else "never" print(f" {u.display_name:<12} {u.email:<28} {u.role:<11} {state}{locked} last: {last}") finally: db.close() def main(): ap = argparse.ArgumentParser(description="Operator-account admin") sub = ap.add_subparsers(dest="cmd", required=True) p = sub.add_parser("create-superadmin") p.add_argument("--email", required=True); p.add_argument("--name", required=True) p.add_argument("--generate", action="store_true", help="generate a temp password instead of prompting") p.set_defaults(fn=lambda a: cmd_create_superadmin(a.email, a.name, generate=a.generate)) p = sub.add_parser("create-user") p.add_argument("--email", required=True); p.add_argument("--name", required=True) p.add_argument("--role", default="admin", choices=["admin", "superadmin"]) p.set_defaults(fn=lambda a: cmd_create_user(a.email, a.name, a.role)) p = sub.add_parser("reset-password") p.add_argument("--email", required=True) p.set_defaults(fn=lambda a: cmd_reset_password(a.email)) p = sub.add_parser("disable"); p.add_argument("--email", required=True) p.set_defaults(fn=lambda a: cmd_set_active(a.email, False)) p = sub.add_parser("enable"); p.add_argument("--email", required=True) p.set_defaults(fn=lambda a: cmd_set_active(a.email, True)) p = sub.add_parser("list"); p.set_defaults(fn=lambda a: cmd_list()) args = ap.parse_args() args.fn(args) if __name__ == "__main__": main() ``` - [ ] **Step 4: Run tests to verify they pass** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v` Expected: PASS (4 passed) - [ ] **Step 5: Commit** ```bash git add backend/operator_admin.py tests/test_operator_admin_cli.py git commit -m "feat(auth): operator admin/break-glass CLI" ``` --- ## Task 9: Machine-endpoint regression guard + full-suite green A dedicated regression test that the gate, when ON, never blocks the watcher heartbeats — and a full-suite run to confirm nothing else broke. **Files:** - Test: `tests/test_operator_machine_endpoints.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_operator_machine_endpoints.py from tests.conftest import wire_operator_auth def test_machine_endpoints_not_blocked_by_gate(client, db_session, monkeypatch): """With the gate ON and no cookie, the LAN-only watcher endpoints must reach their handlers (the gate must never silently break heartbeats). A handler may return 422 for an empty body — that still proves the gate let it through.""" wire_operator_auth(monkeypatch, db_session, enabled=True) r = client.post("/api/series3/heartbeat", json={}, follow_redirects=False) assert r.status_code != 401 # gate would 401 an unauth /api/* route assert r.status_code != 303 r = client.post("/api/series4/heartbeat", json={}, follow_redirects=False) assert r.status_code not in (401, 303) r = client.post("/emitters/report", json={}, follow_redirects=False) assert r.status_code != 303 # gate would 303 an unauth HTML route def test_static_assets_exempt(client, db_session, monkeypatch): wire_operator_auth(monkeypatch, db_session, enabled=True) # /sw.js and /manifest.json are PWA assets clients fetch pre-login. assert client.get("/sw.js", follow_redirects=False).status_code in (200, 404) assert client.get("/sw.js", follow_redirects=False).status_code != 303 ``` - [ ] **Step 2: Run the test to verify it passes (the exempt list from Task 5 already covers it)** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_machine_endpoints.py -v` Expected: PASS (2 passed). If any assertion fails, the exempt list in `backend/operator_auth.py` (`_EXEMPT_EXACT`) is missing that path — add it and re-run. - [ ] **Step 3: Run the FULL test suite to confirm no regressions** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/ -v` Expected: PASS — all operator-auth tests plus the pre-existing portal/auth tests. The gate defaults OFF (env unset), so every pre-existing test is unaffected. - [ ] **Step 4: Commit** ```bash git add tests/test_operator_machine_endpoints.py git commit -m "test(auth): regression guard — gate never blocks machine endpoints" ``` --- ## Task 10: Wire the flag into compose + CHANGELOG + rollout doc Make the flag operable in deployment and document the no-self-lockout rollout. **Files:** - Modify: `docker-compose.yml` (pass `OPERATOR_AUTH_ENABLED` through, default off) - Modify: `CHANGELOG.md` (Unreleased → Added) - [ ] **Step 1: Add the flag pass-through to `docker-compose.yml`** In the `web-app` service `environment:` block, after the `COOKIE_SECURE` line (line 20), add: ```yaml # Operator login gate. Leave false to ship dark; seed a superadmin via # backend/operator_admin.py, confirm you can log in, THEN set true to enforce. # Instant escape hatch: set back to false. See docs/superpowers/specs/2026-06-17-operator-auth-design.md - OPERATOR_AUTH_ENABLED=${OPERATOR_AUTH_ENABLED:-false} ``` - [ ] **Step 2: Add the CHANGELOG entry** In `CHANGELOG.md`, under `## [Unreleased]` → `### Added`, add this bullet (keep it grouped with the other Added items): ```markdown - **Operator authentication (login + roles), shipped dark behind `OPERATOR_AUTH_ENABLED`.** The internal app gains a deny-by-default login gate (one Starlette middleware over every route except an allow-list: `/login` `/logout` `/health` `/static/*` `/portal/*` + the three machine heartbeat endpoints). Two roles — `superadmin` (account management) and `admin` (full app); `operator` reserved. Sessions are a 30-day HMAC-signed `tv_session` cookie re-validated against the DB each request (instant revoke via `active` / `sessions_valid_from`). Password reset is superadmin-driven: reset-anyone (temp shown once + forced change), self-service `/change-password`, and a `backend/operator_admin.py` seed/break-glass CLI. Brute-force lockout (5 tries / 15 min). New `operator_users` table auto-creates — no migration. Reuses the portal's argon2 hasher + a new shared `backend/auth_cookies.py` signer. Rollout: ship with the flag off (app unchanged), seed a superadmin, confirm login, then flip on — the flag is an instant escape hatch. Spec: `docs/superpowers/specs/2026-06-17-operator-auth-design.md`. ``` - [ ] **Step 3: Run the full suite once more (sanity — no code changed, but confirm compose/env didn't break import)** Run: `docker exec terra-view-terra-view-1 python -m pytest tests/ -q` Expected: PASS (all green) - [ ] **Step 4: Commit** ```bash git add docker-compose.yml CHANGELOG.md git commit -m "chore(auth): wire OPERATOR_AUTH_ENABLED into compose + changelog" ``` --- ## Manual verification (after all tasks — done by the human, not a step) These confirm the rollout sequence on a real container (the spec's "no self-lockout" path): 1. **Flag off (default) — app unchanged:** `docker compose up -d` with no `OPERATOR_AUTH_ENABLED` set → the app behaves exactly as today (no login). 2. **Seed:** `docker exec terra-view-terra-view-1 python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"` (prompts for a password). 3. **Log in while still off:** visit `/login`, sign in → you get a `tv_session` cookie (login works regardless of the flag). 4. **Enforce:** set `OPERATOR_AUTH_ENABLED=true`, `docker compose up -d web-app` → the gate enforces; your cookie lets you in; anything wrong → set it back to `false` (instant escape hatch). 5. **Add parents:** from `/admin/users`, add `admin` accounts (temp passwords shown once; they change on first login). --- ## Self-Review (run by the plan author after writing — recorded here) **Spec coverage:** OperatorUser table (T2) ✓ · auth_cookies shared signer (T1) ✓ · `tv_session` cookie + 30-day + DB re-validation + sessions_valid_from (T3) ✓ · deny-by-default middleware + exempt list incl. 3 machine endpoints + must_change redirect + HTML-303/api-401 split (T5) ✓ · role ladder + require_role superadmin-only (T2/T5/T7) ✓ · authenticate + lockout 5/15min (T4/T6) ✓ · password reset all three paths + forgot=contact-admin (T4/T6/T7) ✓ · seed/break-glass CLI (T8) ✓ · user-mgmt UI (T7) ✓ · login/logout/change-password + templates (T6) ✓ · flag governs middleware AND require_role (T5) ✓ · rollout sequence (T10 + manual) ✓ · no migration / new table auto-creates (T2) ✓ · CHANGELOG (T10) ✓. **Type/name consistency:** `make_operator_cookie(uid, iat=None)`, `current_operator(request, db)`, `authenticate(db, email, password) -> (user, status)`, `create_operator(db, email, name, role, password=None, must_change=None) -> (user, raw)`, `reset_operator_password(db, user) -> raw`, `change_own_password(db, user, new_password) -> new_iat`, `role_at_least(role, minimum)`, `require_role(minimum)`, `COOKIE_NAME="tv_session"` — all referenced consistently across T3–T8. The `sessions_valid_from`-vs-`iat` truncation race is handled at every write point (`_utcnow_seconds` default, `utcfromtimestamp(new_iat)` on change, `.replace(microsecond=0)` on reset) and covered by `test_cookie_minted_with_matching_iat_after_bump_still_valid`. **Placeholder scan:** none — every code step is complete.