From 37e6ca55c1294fa14d9a1026d69898bcbee869a8 Mon Sep 17 00:00:00 2001 From: serversdown Date: Wed, 17 Jun 2026 18:57:26 +0000 Subject: [PATCH] docs: operator-auth implementation plan (10 TDD tasks) --- .../plans/2026-06-17-operator-auth.md | 1801 +++++++++++++++++ 1 file changed, 1801 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-17-operator-auth.md diff --git a/docs/superpowers/plans/2026-06-17-operator-auth.md b/docs/superpowers/plans/2026-06-17-operator-auth.md new file mode 100644 index 0000000..4282327 --- /dev/null +++ b/docs/superpowers/plans/2026-06-17-operator-auth.md @@ -0,0 +1,1801 @@ +# Operator Authentication Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a login + roles to the internal Terra-View app (which today has zero auth), gated behind a `OPERATOR_AUTH_ENABLED` feature flag, with a dead-simple superadmin-driven password-reset story. + +**Architecture:** One deny-by-default Starlette HTTP middleware gates every route except an explicit allow-list (login/logout/health/static/portal + 3 machine endpoints). It resolves an `OperatorUser` from a signed `tv_session` cookie (re-validated against the DB every request) and stashes it on `request.state.operator`; a `require_role()` dependency reads it for the superadmin-only user-management routes. Reuses the portal's argon2 hasher and HMAC-signed-cookie pattern. The flag governs both the middleware and `require_role`, so shipping with it off behaves exactly like today. + +**Tech Stack:** FastAPI + Starlette middleware, SQLAlchemy/SQLite, argon2-cffi (already a dep), stdlib `hmac`/`hashlib`, Jinja2, pytest (existing harness). + +**Spec:** `docs/superpowers/specs/2026-06-17-operator-auth-design.md` + +--- + +## File Structure + +| File | Responsibility | +|---|---| +| `backend/auth_cookies.py` *(new)* | generic `sign(payload)` / `read(raw, max_age)` signer + `SECRET_KEY` / `COOKIE_SECURE` | +| `backend/models.py` *(modify)* | add `OperatorUser` table + `_utcnow_seconds` helper | +| `backend/operator_auth.py` *(new)* | flag, role ranks, cookie helpers, `current_operator`, `authenticate`, lockout, data helpers (create/reset/role/active/change), the gate middleware, `require_role` | +| `backend/routers/operator_auth_routes.py` *(new)* | `/login`, `/logout`, `/change-password` | +| `backend/routers/operator_users.py` *(new)* | `/admin/users` page + CRUD JSON (superadmin) | +| `backend/operator_admin.py` *(new)* | seed / break-glass CLI | +| `backend/main.py` *(modify)* | register the gate middleware + the two routers | +| `templates/login.html`, `templates/change_password.html` *(new)* | standalone (no nav) auth pages | +| `templates/admin/users.html` *(new)* | user-management page (extends base.html) | +| `tests/conftest.py` *(modify)* | add `wire_operator_auth()` helper | +| `tests/test_operator_*.py` *(new)* | per-task test files | + +**Conventions discovered (follow these exactly):** +- UUID PKs: `id = Column(String, primary_key=True, index=True)` set via `str(uuid.uuid4())`. +- argon2 helpers live in `backend/auth_passwords.py`: `hash_password(raw)`, `verify_password(raw, hashed)` (never raises), `generate_password(n_bytes=12)`. +- Tests run in the dev container: `docker exec terra-view-terra-view-1 python -m pytest tests/ -v`. Source is bind-mounted, so no rebuild between edits. +- The gate middleware reads the DB through a module-level `SessionLocal` (same pattern the portal WS handler uses) so tests can monkeypatch it. `db_session.get_bind()` returns the test engine — bind a `sessionmaker` to it. +- The flag and `SessionLocal` are read as **module globals at call time**, so `monkeypatch.setattr(backend.operator_auth, "...", ...)` works without re-importing. + +--- + +## Task 1: Generic signed-cookie module (`auth_cookies.py`) + +Lift the portal's HMAC signer into a shared, payload-agnostic module the operator auth uses now (the portal keeps its own helpers untouched). + +**Files:** +- Create: `backend/auth_cookies.py` +- Test: `tests/test_operator_cookies.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_cookies.py +import time +from backend.auth_cookies import sign, read + + +def test_sign_then_read_round_trips(): + raw = sign({"uid": "abc", "iat": 1000}) + data = read(raw, max_age=3600) + assert data == {"uid": "abc", "iat": 1000} + + +def test_tampered_signature_is_rejected(): + raw = sign({"uid": "abc", "iat": int(time.time())}) + body, _sig = raw.rsplit(".", 1) + assert read(body + ".deadbeef", max_age=3600) is None + + +def test_tampered_body_is_rejected(): + raw = sign({"uid": "abc", "iat": int(time.time())}) + body, sig = raw.rsplit(".", 1) + import base64, json + forged = base64.urlsafe_b64encode(json.dumps({"uid": "evil", "iat": int(time.time())}).encode()).decode() + assert read(forged + "." + sig, max_age=3600) is None + + +def test_expired_by_iat_is_rejected(): + raw = sign({"uid": "abc", "iat": int(time.time()) - 10_000}) + assert read(raw, max_age=3600) is None + + +def test_garbage_input_is_none_not_raise(): + assert read("not-a-cookie", max_age=3600) is None + assert read("", max_age=3600) is None + assert read(None, max_age=3600) is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'backend.auth_cookies'` + +- [ ] **Step 3: Write the implementation** + +```python +# backend/auth_cookies.py +"""Generic HMAC-signed cookie payloads, shared by operator auth (and, optionally +later, the portal). A signed value is f"{b64url(json)}.{hmac_sha256(b64)}"; read() +verifies the signature in constant time and enforces a server-side iat expiry. + +The signing secret is the same SECRET_KEY the portal already reads, so a single +env var protects both cookies. Never store or log raw secrets.""" +import os +import hmac +import json +import time +import base64 +import hashlib +import logging + +logger = logging.getLogger(__name__) + +# Same env var the portal cookie uses — one secret protects both. The insecure +# default only exists so dev/test boots without config; set a real SECRET_KEY in prod. +SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me") +# Set COOKIE_SECURE=true once served over HTTPS; leave false on plain HTTP or the +# browser won't send the cookie. +COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false").lower() in ("1", "true", "yes") + + +def _sign(body: str) -> str: + return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest() + + +def sign(payload: dict) -> str: + """Serialize + sign a payload dict into a cookie-safe string.""" + body = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode() + return f"{body}.{_sign(body)}" + + +def read(raw, max_age: int): + """Verify a signed value and return its payload dict, or None if missing, + tampered, or older than max_age seconds (by its own `iat`).""" + if not raw or not isinstance(raw, str): + return None + try: + body, sig = raw.rsplit(".", 1) + except (ValueError, AttributeError): + return None + if not hmac.compare_digest(sig, _sign(body)): + return None + try: + data = json.loads(base64.urlsafe_b64decode(body.encode())) + except Exception: + return None + if not isinstance(data, dict): + return None + iat = data.get("iat") + if not isinstance(iat, (int, float)) or (time.time() - iat) > max_age: + return None + return data +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v` +Expected: PASS (5 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/auth_cookies.py tests/test_operator_cookies.py +git commit -m "feat(auth): generic HMAC signed-cookie module for operator auth" +``` + +--- + +## Task 2: `OperatorUser` model + role-rank helpers + +The new table (auto-created by `Base.metadata.create_all`, no migration) and the role ladder. + +**Files:** +- Modify: `backend/models.py` (add `_utcnow_seconds` near the top imports, and `OperatorUser` at end of file) +- Create: `backend/operator_auth.py` (start it with constants + `role_at_least`) +- Test: `tests/test_operator_model.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_model.py +import uuid +from backend.models import OperatorUser +from backend.operator_auth import role_at_least, _ROLE_RANK + + +def test_operator_user_defaults(db_session): + u = OperatorUser(id=str(uuid.uuid4()), email="a@x.com", display_name="A", + password_hash="h", role="admin") + db_session.add(u) + db_session.commit() + got = db_session.query(OperatorUser).filter_by(email="a@x.com").first() + assert got.active is True + assert got.must_change_password is False + assert got.failed_login_count == 0 + assert got.locked_until is None + assert got.sessions_valid_from is not None + assert got.sessions_valid_from.microsecond == 0 # truncated to whole seconds + + +def test_email_is_unique(db_session): + for i in range(2): + db_session.add(OperatorUser(id=str(uuid.uuid4()), email="dup@x.com", + display_name="d", password_hash="h", role="admin")) + import pytest + with pytest.raises(Exception): + db_session.commit() + + +def test_role_ladder(): + assert _ROLE_RANK == {"operator": 10, "admin": 20, "superadmin": 30} + assert role_at_least("superadmin", "admin") is True + assert role_at_least("admin", "admin") is True + assert role_at_least("admin", "superadmin") is False + assert role_at_least("operator", "admin") is False + assert role_at_least("nonsense", "admin") is False +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v` +Expected: FAIL — `ImportError: cannot import name 'OperatorUser'` + +- [ ] **Step 3a: Add the `_utcnow_seconds` helper to `backend/models.py`** + +Insert directly after the existing `from datetime import datetime` line (line 2) and before `from backend.database import Base`: + +```python +def _utcnow_seconds(): + """utcnow truncated to whole seconds — used as the default for + sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second + epoch) never falls a few microseconds before it and self-invalidates.""" + return datetime.utcnow().replace(microsecond=0) +``` + +- [ ] **Step 3b: Append the `OperatorUser` model at the end of `backend/models.py`** + +```python +class OperatorUser(Base): + """An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and + 'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table + → create_all builds it, no migration. Never store or log raw passwords.""" + __tablename__ = "operator_users" + + id = Column(String, primary_key=True, index=True) # UUID + email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased + display_name = Column(String, nullable=False) # "Brian", "Dad" + password_hash = Column(String, nullable=False) # argon2id + role = Column(String, nullable=False, default="admin") # superadmin | admin + active = Column(Boolean, default=True) # False = login disabled + must_change_password = Column(Boolean, default=False) # forces a change next login + sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere + failed_login_count = Column(Integer, default=0) # lockout counter + locked_until = Column(DateTime, nullable=True) # set after too many bad tries + created_at = Column(DateTime, default=datetime.utcnow) + last_login_at = Column(DateTime, nullable=True) +``` + +- [ ] **Step 3c: Create `backend/operator_auth.py` with constants + `role_at_least`** + +```python +# backend/operator_auth.py +"""Operator authentication: the deny-by-default gate, session cookie, login + +lockout, and the small data helpers shared by the routes and the CLI. Reuses the +argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies). + +The flag and SessionLocal are read as module globals at call time so tests can +monkeypatch them.""" +import os +import time +import uuid +from datetime import datetime, timedelta + +from backend.models import OperatorUser +from backend.auth_passwords import hash_password, verify_password, generate_password + +# Feature flag — OFF by default. When off, the gate and require_role both pass +# everything through and the app behaves exactly as it does today. +OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes") + +COOKIE_NAME = "tv_session" +COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device") +MAX_LOGIN_FAILURES = 5 +LOCK_MINUTES = 15 + +# Role ladder — a rank map so checks read naturally and 'operator' slots in later. +_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30} + + +def role_at_least(role: str, minimum: str) -> bool: + """True iff `role` ranks at or above `minimum`. Unknown roles rank as 0.""" + return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum] + + +def _norm_email(email: str) -> str: + return (email or "").strip().lower() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v` +Expected: PASS (3 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/models.py backend/operator_auth.py tests/test_operator_model.py +git commit -m "feat(auth): OperatorUser model + role ladder" +``` + +--- + +## Task 3: Session cookie helpers + `current_operator` + +Mint/validate the `tv_session` cookie and resolve the operator from a request, re-validating against the DB every call. + +**Files:** +- Modify: `backend/operator_auth.py` (append helpers) +- Test: `tests/test_operator_session.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_session.py +import time +import uuid +from datetime import datetime, timedelta +from types import SimpleNamespace + +from backend.models import OperatorUser +from backend.operator_auth import ( + make_operator_cookie, current_operator, COOKIE_NAME, +) + + +def _make_user(db, **kw): + u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"), + display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw) + db.add(u) + db.commit() + return u + + +def _req(cookie_value): + # current_operator only reads request.cookies — a stub is enough. + return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {}) + + +def test_valid_cookie_resolves_user(db_session): + u = _make_user(db_session) + cookie = make_operator_cookie(u.id) + assert current_operator(_req(cookie), db_session).id == u.id + + +def test_no_or_garbage_cookie_is_none(db_session): + assert current_operator(_req(None), db_session) is None + assert current_operator(_req("garbage"), db_session) is None + + +def test_inactive_user_is_none(db_session): + u = _make_user(db_session, active=False) + assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None + + +def test_locked_user_is_none(db_session): + u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5)) + assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None + + +def test_cookie_older_than_sessions_valid_from_is_none(db_session): + u = _make_user(db_session) + old_iat = int(time.time()) - 1000 + cookie = make_operator_cookie(u.id, iat=old_iat) + u.sessions_valid_from = datetime.utcnow() + db_session.commit() + assert current_operator(_req(cookie), db_session) is None + + +def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session): + # Guards the change-password race: bump sessions_valid_from to the new cookie's + # exact iat → that fresh cookie must remain valid. + u = _make_user(db_session) + new_iat = int(time.time()) + u.sessions_valid_from = datetime.utcfromtimestamp(new_iat) + db_session.commit() + assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v` +Expected: FAIL — `ImportError: cannot import name 'make_operator_cookie'` + +- [ ] **Step 3: Append the helpers to `backend/operator_auth.py`** + +```python +from backend.auth_cookies import sign, read, COOKIE_SECURE # add to the import block at top + + +def make_operator_cookie(uid: str, iat: int = None) -> str: + """Sign a tv_session value for a user id. iat defaults to now; pass an explicit + iat when you bump sessions_valid_from to that same instant (change-password).""" + return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())}) + + +def current_operator(request, db): + """Resolve the OperatorUser for a request's tv_session cookie, or None. + Re-validated against the DB every call: a disabled / locked / password-changed + user drops on the next request. Used by the gate middleware (with its own + session) — does not raise.""" + data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE) + if not data: + return None + uid, iat = data.get("uid"), data.get("iat") + if not uid or not isinstance(iat, (int, float)): + return None + user = db.query(OperatorUser).filter_by(id=uid).first() + if not user or not user.active: + return None + if user.locked_until and user.locked_until > datetime.utcnow(): + return None + if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from: + return None + return user +``` + +NOTE: move the `from backend.auth_cookies import sign, read, COOKIE_SECURE` line up into the import block at the top of the file (shown inline here only to mark the dependency). `COOKIE_SECURE` is imported now because later tasks (login route) set the cookie with it. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v` +Expected: PASS (6 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/operator_auth.py tests/test_operator_session.py +git commit -m "feat(auth): operator session cookie + current_operator DB re-validation" +``` + +--- + +## Task 4: `authenticate` + lockout + data helpers + +Login verification with brute-force lockout, plus the create/reset/role/active/change helpers shared by the routes and the CLI. + +**Files:** +- Modify: `backend/operator_auth.py` (append) +- Test: `tests/test_operator_authenticate.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_authenticate.py +import time +from datetime import datetime +import pytest + +from backend.operator_auth import ( + authenticate, create_operator, reset_operator_password, + set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES, +) +from backend.auth_passwords import verify_password +from backend.models import OperatorUser + + +def test_create_operator_generates_temp_and_forces_change(db_session): + user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin") + assert user.email == "dad@x.com" # lowercased + assert user.must_change_password is True + assert verify_password(raw, user.password_hash) + + +def test_create_operator_with_explicit_password_no_forced_change(db_session): + user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123") + assert raw == "chosen-pw-123" + assert user.must_change_password is False + + +def test_create_operator_rejects_duplicate_and_bad_role(db_session): + create_operator(db_session, "a@x.com", "A", "admin") + with pytest.raises(ValueError): + create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive) + with pytest.raises(ValueError): + create_operator(db_session, "b@x.com", "B", "wizard") # bad role + + +def test_authenticate_success(db_session): + user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + got, status = authenticate(db_session, "OK@x.com", "rightpw-9") + assert status == "ok" and got.id == user.id + assert got.last_login_at is not None + assert got.failed_login_count == 0 + + +def test_authenticate_wrong_password_counts(db_session): + create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9") + got, status = authenticate(db_session, "wp@x.com", "nope") + assert got is None and status == "bad" + assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1 + + +def test_lockout_after_five_then_correct_password_refused(db_session): + create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") + for _ in range(MAX_LOGIN_FAILURES): + authenticate(db_session, "lk@x.com", "nope") + got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked + assert got is None and status == "locked" + + +def test_authenticate_unknown_email_is_bad_not_error(db_session): + got, status = authenticate(db_session, "ghost@x.com", "whatever") + assert got is None and status == "bad" + + +def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session): + user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1") + before = user.sessions_valid_from + raw = reset_operator_password(db_session, user) + assert verify_password(raw, user.password_hash) + assert user.must_change_password is True + assert user.sessions_valid_from >= before + + +def test_change_own_password_clears_flag_and_bumps(db_session): + user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") + user.must_change_password = True + db_session.commit() + new_iat = change_own_password(db_session, user, "brand-new-pw-2") + assert verify_password("brand-new-pw-2", user.password_hash) + assert user.must_change_password is False + assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat) + + +def test_set_active_and_role(db_session): + user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1") + set_operator_active(db_session, user, False) + assert user.active is False + set_operator_role(db_session, user, "superadmin") + assert user.role == "superadmin" + with pytest.raises(ValueError): + set_operator_role(db_session, user, "wizard") +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v` +Expected: FAIL — `ImportError: cannot import name 'authenticate'` + +- [ ] **Step 3: Append the helpers to `backend/operator_auth.py`** + +```python +def register_login_failure(db, user) -> None: + """Increment a user's failure counter and lock them out past the threshold.""" + user.failed_login_count = (user.failed_login_count or 0) + 1 + if user.failed_login_count >= MAX_LOGIN_FAILURES: + user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES) + db.commit() + + +def authenticate(db, email, password): + """Return (user, "ok") on success, (None, "locked") if locked out, else + (None, "bad"). Never reveals whether the email exists (generic 'bad').""" + user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first() + if user and user.locked_until and user.locked_until > datetime.utcnow(): + return None, "locked" + if not user or not user.active or not verify_password(password, user.password_hash): + if user: + register_login_failure(db, user) + return None, "bad" + user.failed_login_count = 0 + user.locked_until = None + user.last_login_at = datetime.utcnow() + db.commit() + return user, "ok" + + +def create_operator(db, email, name, role, password=None, must_change=None): + """Create an operator. With no password, generate a temp one and force a change + (must_change defaults True). With a password, must_change defaults False. + Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role.""" + email = _norm_email(email) + if role not in _ROLE_RANK: + raise ValueError(f"unknown role {role!r}") + if db.query(OperatorUser).filter_by(email=email).first(): + raise ValueError(f"operator {email} already exists") + if password is None: + password = generate_password() + if must_change is None: + must_change = True + elif must_change is None: + must_change = False + user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name, + password_hash=hash_password(password), role=role, + active=True, must_change_password=must_change) + db.add(user) + db.commit() + return user, password + + +def reset_operator_password(db, user) -> str: + """Generate a fresh temp password, force a change, log the user out everywhere. + Returns the raw password to show once.""" + raw = generate_password() + user.password_hash = hash_password(raw) + user.must_change_password = True + user.failed_login_count = 0 + user.locked_until = None + user.sessions_valid_from = datetime.utcnow().replace(microsecond=0) + db.commit() + return raw + + +def change_own_password(db, user, new_password) -> int: + """Set a user's own new password, clear the forced-change flag, and bump + sessions_valid_from to the returned iat — the caller mints the replacement + cookie with that exact iat so it stays valid while older cookies die.""" + new_iat = int(time.time()) + user.password_hash = hash_password(new_password) + user.must_change_password = False + user.sessions_valid_from = datetime.utcfromtimestamp(new_iat) + db.commit() + return new_iat + + +def set_operator_active(db, user, active: bool): + user.active = bool(active) + db.commit() + return user + + +def set_operator_role(db, user, role: str): + if role not in _ROLE_RANK: + raise ValueError(f"unknown role {role!r}") + user.role = role + db.commit() + return user +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v` +Expected: PASS (10 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/operator_auth.py tests/test_operator_authenticate.py +git commit -m "feat(auth): authenticate + lockout + operator data helpers" +``` + +--- + +## Task 5: Gate middleware + `require_role` + wire into the app + +The deny-by-default Starlette middleware and the role dependency. Registering the middleware with the flag defaulting OFF keeps every existing test green. + +**Files:** +- Modify: `backend/operator_auth.py` (append `operator_gate` + `require_role`) +- Modify: `backend/main.py` (register the middleware) +- Modify: `tests/conftest.py` (add `wire_operator_auth` helper) +- Test: `tests/test_operator_gate.py` + +- [ ] **Step 1: Add the `wire_operator_auth` helper to `tests/conftest.py`** + +Append at the end of `tests/conftest.py`: + +```python +def wire_operator_auth(monkeypatch, db_session, enabled=True): + """Point the gate middleware's SessionLocal at the test engine and flip the + flag. The middleware opens its OWN session (it can't use the get_db override), + so it must read the same engine the test writes to.""" + import backend.operator_auth as oa + from sqlalchemy.orm import sessionmaker + maker = sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False) + monkeypatch.setattr(oa, "SessionLocal", maker, raising=False) + monkeypatch.setattr(oa, "OPERATOR_AUTH_ENABLED", enabled, raising=False) + return oa +``` + +- [ ] **Step 2: Write the failing test** + +```python +# tests/test_operator_gate.py +import uuid +from tests.conftest import wire_operator_auth +from backend.models import OperatorUser +from backend.operator_auth import make_operator_cookie, COOKIE_NAME +from backend.auth_passwords import hash_password + + +def _make_user(db, role="admin", **kw): + u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"), + display_name="U", password_hash=hash_password("pw"), role=role, **kw) + db.add(u) + db.commit() + return u + + +def test_flag_off_passes_everything(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=False) + assert client.get("/", follow_redirects=False).status_code == 200 + + +def test_gated_html_redirects_to_login_when_unauth(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/", follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"].startswith("/login?next=") + + +def test_gated_api_returns_401_json_when_unauth(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/api/status-snapshot", follow_redirects=False) + assert r.status_code == 401 + + +def test_valid_session_passes(client, db_session, monkeypatch): + u = _make_user(db_session) + wire_operator_auth(monkeypatch, db_session, enabled=True) + client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id)) + assert client.get("/", follow_redirects=False).status_code == 200 + + +def test_must_change_password_user_routed_to_change_password(client, db_session, monkeypatch): + u = _make_user(db_session, must_change_password=True) + wire_operator_auth(monkeypatch, db_session, enabled=True) + client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id)) + r = client.get("/", follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/change-password" + + +def test_exempt_paths_pass_without_cookie(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + assert client.get("/health", follow_redirects=False).status_code == 200 + assert client.get("/login", follow_redirects=False).status_code == 200 + + +def test_portal_paths_are_exempt(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + # /portal/p/ hits the portal's own gate (403/404), never the operator login. + r = client.get("/portal/p/nope", follow_redirects=False) + assert r.status_code in (403, 404) +``` + +- [ ] **Step 3: Append `operator_gate` + `require_role` to `backend/operator_auth.py`** + +Add these imports to the top of the file: + +```python +from urllib.parse import quote +from fastapi import Request, HTTPException +from fastapi.responses import JSONResponse, RedirectResponse +from backend.database import SessionLocal +``` + +Then append: + +```python +# Routes reachable with no login. A new route added next year is gated by default. +_EXEMPT_EXACT = { + "/login", "/logout", "/health", + "/manifest.json", "/sw.js", "/favicon.ico", "/offline-db.js", + "/portal", # portal home (its own auth) + # machine endpoints — LAN-only, automated, no human (watchers/heartbeats): + "/emitters/report", "/api/series3/heartbeat", "/api/series4/heartbeat", +} +_EXEMPT_PREFIX = ("/static/", "/portal/") + + +def _is_exempt(path: str) -> bool: + return path in _EXEMPT_EXACT or path.startswith(_EXEMPT_PREFIX) + + +async def operator_gate(request: Request, call_next): + """Deny-by-default gate. Flag off → pass through (app as today). Flag on → + exempt paths pass; otherwise require a valid operator session, stash it on + request.state.operator, and force a password change when pending.""" + if not OPERATOR_AUTH_ENABLED: + return await call_next(request) + + path = request.url.path + if _is_exempt(path): + return await call_next(request) + + db = SessionLocal() + try: + user = current_operator(request, db) + if user is not None: + db.expunge(user) # detach a fully-loaded row so we can close now + finally: + db.close() + + if user is None: + if path.startswith("/api/"): + return JSONResponse({"detail": "Not authenticated"}, status_code=401) + return RedirectResponse(f"/login?next={quote(path)}", status_code=303) + + if user.must_change_password and path not in ("/change-password", "/logout"): + return RedirectResponse("/change-password", status_code=303) + + request.state.operator = user + return await call_next(request) + + +def require_role(minimum: str): + """Dependency factory: require a logged-in operator ranked >= `minimum`. + Respects the flag (off → pass through). When on, the middleware has already + set request.state.operator before this runs.""" + def _dep(request: Request): + if not OPERATOR_AUTH_ENABLED: + return None + user = getattr(request.state, "operator", None) + if user is None: + raise HTTPException(status_code=401, detail="Not authenticated") + if not role_at_least(user.role, minimum): + raise HTTPException(status_code=403, detail="Insufficient permissions") + return user + return _dep +``` + +- [ ] **Step 4: Register the middleware in `backend/main.py`** + +After the existing `add_environment_to_context` middleware block (ends line 90), add: + +```python +# Operator auth — deny-by-default gate over the whole internal app. Governed by +# OPERATOR_AUTH_ENABLED (default off → behaves exactly as today). See +# docs/superpowers/specs/2026-06-17-operator-auth-design.md. +from backend.operator_auth import operator_gate +app.middleware("http")(operator_gate) +``` + +- [ ] **Step 5: Run tests to verify the gate works AND nothing regressed** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_gate.py tests/test_portal_gate.py -v` +Expected: PASS (gate tests pass; the 6 existing portal-gate tests still pass — flag defaults off) + +- [ ] **Step 6: Commit** + +```bash +git add backend/operator_auth.py backend/main.py tests/conftest.py tests/test_operator_gate.py +git commit -m "feat(auth): deny-by-default gate middleware + require_role" +``` + +--- + +## Task 6: Login / logout / change-password routes + templates + +The auth pages. These routes work regardless of the flag (you log in while the flag is still off during rollout) and are on the gate's exempt list (login/logout) — except `/change-password`, which requires a session. + +**Files:** +- Create: `backend/routers/operator_auth_routes.py` +- Create: `templates/login.html`, `templates/change_password.html` +- Modify: `backend/main.py` (include the router) +- Test: `tests/test_operator_login.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_login.py +import uuid +from tests.conftest import wire_operator_auth +from backend.operator_auth import ( + create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES, +) + + +def test_login_page_renders(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/login") + assert r.status_code == 200 + assert "password" in r.text.lower() + + +def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"}, + follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/" + assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "") + + +def test_login_honors_next(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"}, + follow_redirects=False) + assert r.headers["location"] == "/settings" + + +def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch): + create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "ok@x.com", "password": "nope"}, + follow_redirects=False) + assert r.status_code == 200 + assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "") + assert "invalid" in r.text.lower() + + +def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch): + create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change + # fetch the raw temp by resetting to a known one + user = None + from backend.models import OperatorUser + user = db_session.query(OperatorUser).filter_by(email="new@x.com").first() + from backend.auth_passwords import hash_password + user.password_hash = hash_password("temp-pw-1") + db_session.commit() + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"}, + follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/change-password" + + +def test_login_lockout_message_after_five(client, db_session, monkeypatch): + create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9") + wire_operator_auth(monkeypatch, db_session, enabled=True) + for _ in range(MAX_LOGIN_FAILURES): + client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False) + r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False) + assert r.status_code == 200 + assert "too many" in r.text.lower() + + +def test_logout_clears_cookie(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + r = client.get("/logout", follow_redirects=False) + assert r.status_code == 303 + assert r.headers["location"] == "/login" + # cookie cleared (deletion appears as an expired/empty set-cookie) + assert COOKIE_NAME in r.headers.get("set-cookie", "") + + +def test_change_password_self_service(client, db_session, monkeypatch): + user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1") + wire_operator_auth(monkeypatch, db_session, enabled=True) + client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) + r = client.post("/change-password", + data={"current_password": "orig-pw-1", "new_password": "brand-new-2", + "confirm_password": "brand-new-2"}, follow_redirects=False) + assert r.status_code == 303 + from backend.auth_passwords import verify_password + db_session.refresh(user) + assert verify_password("brand-new-2", user.password_hash) + assert user.must_change_password is False +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v` +Expected: FAIL — `/login` returns 404 / 303 (route doesn't exist yet) + +- [ ] **Step 3a: Create `templates/login.html`** + +```html + + + + + + Sign in · Terra-View + + + +
+

Terra-View

+ {% if error %} +
{{ error }}
+ {% endif %} +
+
+ + +
+
+ + +
+ +
+

Forgot your password? Contact your administrator.

+
+ + +``` + +- [ ] **Step 3b: Create `templates/change_password.html`** + +```html + + + + + + Change password · Terra-View + + + +
+

Change your password

+ {% if must_change %} +

Please set a new password to continue.

+ {% endif %} + {% if error %} +
{{ error }}
+ {% endif %} +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+ + +``` + +- [ ] **Step 3c: Create `backend/routers/operator_auth_routes.py`** + +```python +"""Operator login / logout / change-password. These routes intentionally work +regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during +rollout). /login and /logout are on the gate's exempt list; /change-password +requires a session (the gate sets request.state.operator).""" +from fastapi import APIRouter, Request, Depends, Form +from fastapi.responses import RedirectResponse +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.templates_config import templates +from backend.operator_auth import ( + authenticate, current_operator, change_own_password, make_operator_cookie, + COOKIE_NAME, COOKIE_MAX_AGE, +) +from backend.auth_cookies import COOKIE_SECURE +from backend.auth_passwords import verify_password + +router = APIRouter(tags=["operator-auth"]) + + +def _safe_next(next_url: str) -> str: + """Only allow same-site relative redirects (an open-redirect guard).""" + if next_url and next_url.startswith("/") and not next_url.startswith("//"): + return next_url + return "/" + + +@router.get("/login") +async def login_page(request: Request, next: str = "", error: str = ""): + return templates.TemplateResponse("login.html", + {"request": request, "next": next, "error": error}) + + +@router.post("/login") +async def login_submit(request: Request, next: str = "", + email: str = Form(...), password: str = Form(...), + db: Session = Depends(get_db)): + user, status = authenticate(db, email, password) + if status == "locked": + return templates.TemplateResponse( + "login.html", + {"request": request, "next": next, + "error": "Too many attempts — try again in 15 minutes."}, + status_code=200) + if user is None: + return templates.TemplateResponse( + "login.html", + {"request": request, "next": next, "error": "Invalid email or password."}, + status_code=200) + dest = "/change-password" if user.must_change_password else _safe_next(next) + resp = RedirectResponse(url=dest, status_code=303) + resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id), + max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) + return resp + + +@router.get("/logout") +async def logout(request: Request): + resp = RedirectResponse(url="/login", status_code=303) + resp.delete_cookie(COOKIE_NAME) + return resp + + +@router.get("/change-password") +async def change_password_page(request: Request, db: Session = Depends(get_db)): + user = getattr(request.state, "operator", None) or current_operator(request, db) + if user is None: + return RedirectResponse(url="/login", status_code=303) + return templates.TemplateResponse( + "change_password.html", + {"request": request, "must_change": user.must_change_password, "error": ""}) + + +@router.post("/change-password") +async def change_password_submit(request: Request, + current_password: str = Form(...), + new_password: str = Form(...), + confirm_password: str = Form(...), + db: Session = Depends(get_db)): + user = getattr(request.state, "operator", None) or current_operator(request, db) + if user is None: + return RedirectResponse(url="/login", status_code=303) + + def _err(msg): + return templates.TemplateResponse( + "change_password.html", + {"request": request, "must_change": user.must_change_password, "error": msg}, + status_code=200) + + if not verify_password(current_password, user.password_hash): + return _err("Current password is incorrect.") + if len(new_password) < 8: + return _err("New password must be at least 8 characters.") + if new_password != confirm_password: + return _err("New passwords do not match.") + + new_iat = change_own_password(db, user, new_password) + resp = RedirectResponse(url="/", status_code=303) + resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat), + max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE) + return resp +``` + +- [ ] **Step 3d: Register the router in `backend/main.py`** + +After the middleware registration added in Task 5, add: + +```python +from backend.routers import operator_auth_routes +app.include_router(operator_auth_routes.router) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v` +Expected: PASS (8 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/routers/operator_auth_routes.py templates/login.html templates/change_password.html backend/main.py tests/test_operator_login.py +git commit -m "feat(auth): login/logout/change-password routes + pages" +``` + +--- + +## Task 7: User-management routes + template (superadmin-only) + +`/admin/users` page and JSON CRUD, all behind `require_role("superadmin")`. Temp passwords are returned once. + +**Files:** +- Create: `backend/routers/operator_users.py` +- Create: `templates/admin/users.html` +- Modify: `backend/main.py` (include the router) +- Test: `tests/test_operator_users.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_users.py +import uuid +from tests.conftest import wire_operator_auth +from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME +from backend.models import OperatorUser + + +def _login_as(client, user): + client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id)) + + +def test_admin_cannot_reach_user_management(client, db_session, monkeypatch): + admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, admin) + assert client.get("/admin/users", follow_redirects=False).status_code == 403 + + +def test_superadmin_sees_user_management(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + assert client.get("/admin/users", follow_redirects=False).status_code == 200 + + +def test_superadmin_lists_users_json(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.get("/api/admin/users") + assert r.status_code == 200 + emails = [u["email"] for u in r.json()["users"]] + assert "su@x.com" in emails + assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes + + +def test_create_user_returns_temp_once(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post("/api/admin/users", + json={"email": "dad@x.com", "name": "Dad", "role": "admin"}) + assert r.status_code == 200 + assert len(r.json()["password"]) >= 12 + made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first() + assert made.must_change_password is True + + +def test_reset_password_returns_temp_once(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{target.id}/reset-password") + assert r.status_code == 200 and len(r.json()["password"]) >= 12 + db_session.refresh(target) + assert target.must_change_password is True + + +def test_disable_and_enable(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200 + db_session.refresh(target); assert target.active is False + assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200 + db_session.refresh(target); assert target.active is True + + +def test_change_role(client, db_session, monkeypatch): + su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456") + target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456") + wire_operator_auth(monkeypatch, db_session, enabled=True) + _login_as(client, su) + r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}) + assert r.status_code == 200 + db_session.refresh(target); assert target.role == "superadmin" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v` +Expected: FAIL — routes don't exist (404) + +- [ ] **Step 3a: Create `templates/admin/users.html`** + +```html +{% extends "base.html" %} +{% block title %}Operator Accounts{% endblock %} +{% block content %} +
+
+

Operator Accounts

+ +
+ + + + + + +
NameEmailRoleStatusLast login
+
+ +{% endblock %} +``` + +- [ ] **Step 3b: Create `backend/routers/operator_users.py`** + +```python +"""Operator account management — superadmin only. Temp passwords are returned in +the JSON response once (shown to the superadmin to hand off); only hashes persist.""" +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.templates_config import templates +from backend.models import OperatorUser +from backend.operator_auth import ( + require_role, create_operator, reset_operator_password, + set_operator_active, set_operator_role, +) +from backend.utils.timezone import format_local_datetime + +router = APIRouter(tags=["operator-users"]) +_superadmin = require_role("superadmin") + + +class NewUser(BaseModel): + email: str + name: str + role: str = "admin" + + +class RoleChange(BaseModel): + role: str + + +def _serialize(u: OperatorUser) -> dict: + from datetime import datetime + return { + "id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role, + "active": bool(u.active), "must_change_password": bool(u.must_change_password), + "locked": bool(u.locked_until and u.locked_until > datetime.utcnow()), + "last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None, + } + + +@router.get("/admin/users") +async def users_page(request: Request, _=Depends(_superadmin)): + return templates.TemplateResponse("admin/users.html", {"request": request}) + + +@router.get("/api/admin/users") +async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)): + users = db.query(OperatorUser).order_by(OperatorUser.display_name).all() + return {"users": [_serialize(u) for u in users]} + + +@router.post("/api/admin/users") +async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)): + try: + user, raw = create_operator(db, body.email, body.name, body.role) + except ValueError as e: + return JSONResponse(status_code=400, content={"detail": str(e)}) + return {"user": _serialize(user), "password": raw} + + +@router.post("/api/admin/users/{user_id}/reset-password") +async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + raw = reset_operator_password(db, user) + return {"password": raw} + + +@router.post("/api/admin/users/{user_id}/disable") +async def disable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + set_operator_active(db, user, False) + return {"active": False} + + +@router.post("/api/admin/users/{user_id}/enable") +async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + set_operator_active(db, user, True) + return {"active": True} + + +@router.post("/api/admin/users/{user_id}/role") +async def change_user_role(user_id: str, body: RoleChange, + _=Depends(_superadmin), db: Session = Depends(get_db)): + user = db.query(OperatorUser).filter_by(id=user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + try: + set_operator_role(db, user, body.role) + except ValueError as e: + return JSONResponse(status_code=400, content={"detail": str(e)}) + return {"role": user.role} +``` + +- [ ] **Step 3c: Register the router in `backend/main.py`** + +After the `operator_auth_routes` include added in Task 6, add: + +```python +from backend.routers import operator_users +app.include_router(operator_users.router) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v` +Expected: PASS (7 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/routers/operator_users.py templates/admin/users.html backend/main.py tests/test_operator_users.py +git commit -m "feat(auth): superadmin user-management page + CRUD" +``` + +--- + +## Task 8: Seed / break-glass CLI (`operator_admin.py`) + +The bootstrap (first superadmin, before any UI is reachable) and the break-glass (locked out / forgot everything). Thin wrappers over the Task-4 data helpers. + +**Files:** +- Create: `backend/operator_admin.py` +- Test: `tests/test_operator_admin_cli.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_admin_cli.py +from sqlalchemy.orm import sessionmaker +from backend.models import OperatorUser +from backend.auth_passwords import verify_password +import backend.operator_admin as cli + + +def _maker(db_session): + return sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False) + + +def test_seed_superadmin(db_session, monkeypatch): + monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) + cli.cmd_create_superadmin(email="brian@x.com", name="Brian", password="chosen-pw-1") + u = db_session.query(OperatorUser).filter_by(email="brian@x.com").first() + assert u.role == "superadmin" + assert u.must_change_password is False + assert verify_password("chosen-pw-1", u.password_hash) + + +def test_create_user_generates_temp(db_session, monkeypatch, capsys): + monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) + cli.cmd_create_user(email="dad@x.com", name="Dad", role="admin") + u = db_session.query(OperatorUser).filter_by(email="dad@x.com").first() + assert u.role == "admin" and u.must_change_password is True + assert "dad@x.com" in capsys.readouterr().out # prints the temp once + + +def test_reset_password_cli(db_session, monkeypatch): + monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) + cli.cmd_create_user(email="r@x.com", name="R", role="admin") + before = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash + cli.cmd_reset_password(email="r@x.com") + after = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash + assert before != after + + +def test_disable_enable_cli(db_session, monkeypatch): + monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False) + cli.cmd_create_user(email="d@x.com", name="D", role="admin") + cli.cmd_set_active(email="d@x.com", active=False) + assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is False + cli.cmd_set_active(email="d@x.com", active=True) + assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is True +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'backend.operator_admin'` + +- [ ] **Step 3: Create `backend/operator_admin.py`** + +```python +#!/usr/bin/env python3 +"""Operator-account admin CLI — the bootstrap and the break-glass. Run inside the +terra-view container against the live DB. Temp/raw passwords are printed ONCE; only +hashes persist. + + # first superadmin (before any UI is reachable) — prompts for a password, or --generate + python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian" + + # a parent's account — generates a temp password, must-change on first login + python3 backend/operator_admin.py create-user --email dad@x.com --name "Dad" --role admin + + python3 backend/operator_admin.py reset-password --email dad@x.com + python3 backend/operator_admin.py list + python3 backend/operator_admin.py disable --email dad@x.com + python3 backend/operator_admin.py enable --email dad@x.com +""" +import os +import sys +import getpass +import argparse +from datetime import datetime + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from backend.database import SessionLocal +from backend.models import OperatorUser +from backend.operator_auth import ( + create_operator, reset_operator_password, set_operator_active, _norm_email, +) + + +def _get(db, email): + u = db.query(OperatorUser).filter_by(email=_norm_email(email)).first() + if not u: + sys.exit(f"No operator with email '{email}'.") + return u + + +def cmd_create_superadmin(email, name, password=None, generate=False): + db = SessionLocal() + try: + if password is None and not generate: + password = getpass.getpass("Password for new superadmin: ") + if not password or len(password) < 8: + sys.exit("Password must be at least 8 characters.") + user, raw = create_operator(db, email, name, "superadmin", + password=None if generate else password) + if generate: + print(f"✓ Superadmin {user.email} created. Temp password (shown once): {raw}") + else: + print(f"✓ Superadmin {user.email} created.") + except ValueError as e: + sys.exit(str(e)) + finally: + db.close() + + +def cmd_create_user(email, name, role="admin"): + db = SessionLocal() + try: + user, raw = create_operator(db, email, name, role) + print(f"✓ {role} {user.email} created. Temp password (shown once): {raw}") + print(" They'll be required to change it on first login.") + except ValueError as e: + sys.exit(str(e)) + finally: + db.close() + + +def cmd_reset_password(email): + db = SessionLocal() + try: + user = _get(db, email) + raw = reset_operator_password(db, user) + print(f"✓ Reset {user.email}. Temp password (shown once): {raw}") + finally: + db.close() + + +def cmd_set_active(email, active): + db = SessionLocal() + try: + user = _get(db, email) + set_operator_active(db, user, active) + print(f"✓ {user.email} {'enabled' if active else 'disabled'}.") + finally: + db.close() + + +def cmd_list(): + db = SessionLocal() + try: + users = db.query(OperatorUser).order_by(OperatorUser.display_name).all() + if not users: + print("No operators yet. Run create-superadmin first.") + return + for u in users: + locked = " [LOCKED]" if (u.locked_until and u.locked_until > datetime.utcnow()) else "" + state = "active" if u.active else "DISABLED" + last = u.last_login_at.strftime("%Y-%m-%d %H:%M") if u.last_login_at else "never" + print(f" {u.display_name:<12} {u.email:<28} {u.role:<11} {state}{locked} last: {last}") + finally: + db.close() + + +def main(): + ap = argparse.ArgumentParser(description="Operator-account admin") + sub = ap.add_subparsers(dest="cmd", required=True) + + p = sub.add_parser("create-superadmin") + p.add_argument("--email", required=True); p.add_argument("--name", required=True) + p.add_argument("--generate", action="store_true", help="generate a temp password instead of prompting") + p.set_defaults(fn=lambda a: cmd_create_superadmin(a.email, a.name, generate=a.generate)) + + p = sub.add_parser("create-user") + p.add_argument("--email", required=True); p.add_argument("--name", required=True) + p.add_argument("--role", default="admin", choices=["admin", "superadmin"]) + p.set_defaults(fn=lambda a: cmd_create_user(a.email, a.name, a.role)) + + p = sub.add_parser("reset-password") + p.add_argument("--email", required=True) + p.set_defaults(fn=lambda a: cmd_reset_password(a.email)) + + p = sub.add_parser("disable"); p.add_argument("--email", required=True) + p.set_defaults(fn=lambda a: cmd_set_active(a.email, False)) + + p = sub.add_parser("enable"); p.add_argument("--email", required=True) + p.set_defaults(fn=lambda a: cmd_set_active(a.email, True)) + + p = sub.add_parser("list"); p.set_defaults(fn=lambda a: cmd_list()) + + args = ap.parse_args() + args.fn(args) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v` +Expected: PASS (4 passed) + +- [ ] **Step 5: Commit** + +```bash +git add backend/operator_admin.py tests/test_operator_admin_cli.py +git commit -m "feat(auth): operator admin/break-glass CLI" +``` + +--- + +## Task 9: Machine-endpoint regression guard + full-suite green + +A dedicated regression test that the gate, when ON, never blocks the watcher heartbeats — and a full-suite run to confirm nothing else broke. + +**Files:** +- Test: `tests/test_operator_machine_endpoints.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_operator_machine_endpoints.py +from tests.conftest import wire_operator_auth + + +def test_machine_endpoints_not_blocked_by_gate(client, db_session, monkeypatch): + """With the gate ON and no cookie, the LAN-only watcher endpoints must reach + their handlers (the gate must never silently break heartbeats). A handler may + return 422 for an empty body — that still proves the gate let it through.""" + wire_operator_auth(monkeypatch, db_session, enabled=True) + + r = client.post("/api/series3/heartbeat", json={}, follow_redirects=False) + assert r.status_code != 401 # gate would 401 an unauth /api/* route + assert r.status_code != 303 + + r = client.post("/api/series4/heartbeat", json={}, follow_redirects=False) + assert r.status_code not in (401, 303) + + r = client.post("/emitters/report", json={}, follow_redirects=False) + assert r.status_code != 303 # gate would 303 an unauth HTML route + + +def test_static_assets_exempt(client, db_session, monkeypatch): + wire_operator_auth(monkeypatch, db_session, enabled=True) + # /sw.js and /manifest.json are PWA assets clients fetch pre-login. + assert client.get("/sw.js", follow_redirects=False).status_code in (200, 404) + assert client.get("/sw.js", follow_redirects=False).status_code != 303 +``` + +- [ ] **Step 2: Run the test to verify it passes (the exempt list from Task 5 already covers it)** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_machine_endpoints.py -v` +Expected: PASS (2 passed). If any assertion fails, the exempt list in `backend/operator_auth.py` (`_EXEMPT_EXACT`) is missing that path — add it and re-run. + +- [ ] **Step 3: Run the FULL test suite to confirm no regressions** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/ -v` +Expected: PASS — all operator-auth tests plus the pre-existing portal/auth tests. The gate defaults OFF (env unset), so every pre-existing test is unaffected. + +- [ ] **Step 4: Commit** + +```bash +git add tests/test_operator_machine_endpoints.py +git commit -m "test(auth): regression guard — gate never blocks machine endpoints" +``` + +--- + +## Task 10: Wire the flag into compose + CHANGELOG + rollout doc + +Make the flag operable in deployment and document the no-self-lockout rollout. + +**Files:** +- Modify: `docker-compose.yml` (pass `OPERATOR_AUTH_ENABLED` through, default off) +- Modify: `CHANGELOG.md` (Unreleased → Added) + +- [ ] **Step 1: Add the flag pass-through to `docker-compose.yml`** + +In the `web-app` service `environment:` block, after the `COOKIE_SECURE` line (line 20), add: + +```yaml + # Operator login gate. Leave false to ship dark; seed a superadmin via + # backend/operator_admin.py, confirm you can log in, THEN set true to enforce. + # Instant escape hatch: set back to false. See docs/superpowers/specs/2026-06-17-operator-auth-design.md + - OPERATOR_AUTH_ENABLED=${OPERATOR_AUTH_ENABLED:-false} +``` + +- [ ] **Step 2: Add the CHANGELOG entry** + +In `CHANGELOG.md`, under `## [Unreleased]` → `### Added`, add this bullet (keep it grouped with the other Added items): + +```markdown +- **Operator authentication (login + roles), shipped dark behind `OPERATOR_AUTH_ENABLED`.** The internal app gains a deny-by-default login gate (one Starlette middleware over every route except an allow-list: `/login` `/logout` `/health` `/static/*` `/portal/*` + the three machine heartbeat endpoints). Two roles — `superadmin` (account management) and `admin` (full app); `operator` reserved. Sessions are a 30-day HMAC-signed `tv_session` cookie re-validated against the DB each request (instant revoke via `active` / `sessions_valid_from`). Password reset is superadmin-driven: reset-anyone (temp shown once + forced change), self-service `/change-password`, and a `backend/operator_admin.py` seed/break-glass CLI. Brute-force lockout (5 tries / 15 min). New `operator_users` table auto-creates — no migration. Reuses the portal's argon2 hasher + a new shared `backend/auth_cookies.py` signer. Rollout: ship with the flag off (app unchanged), seed a superadmin, confirm login, then flip on — the flag is an instant escape hatch. Spec: `docs/superpowers/specs/2026-06-17-operator-auth-design.md`. +``` + +- [ ] **Step 3: Run the full suite once more (sanity — no code changed, but confirm compose/env didn't break import)** + +Run: `docker exec terra-view-terra-view-1 python -m pytest tests/ -q` +Expected: PASS (all green) + +- [ ] **Step 4: Commit** + +```bash +git add docker-compose.yml CHANGELOG.md +git commit -m "chore(auth): wire OPERATOR_AUTH_ENABLED into compose + changelog" +``` + +--- + +## Manual verification (after all tasks — done by the human, not a step) + +These confirm the rollout sequence on a real container (the spec's "no self-lockout" path): + +1. **Flag off (default) — app unchanged:** `docker compose up -d` with no `OPERATOR_AUTH_ENABLED` set → the app behaves exactly as today (no login). +2. **Seed:** `docker exec terra-view-terra-view-1 python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"` (prompts for a password). +3. **Log in while still off:** visit `/login`, sign in → you get a `tv_session` cookie (login works regardless of the flag). +4. **Enforce:** set `OPERATOR_AUTH_ENABLED=true`, `docker compose up -d web-app` → the gate enforces; your cookie lets you in; anything wrong → set it back to `false` (instant escape hatch). +5. **Add parents:** from `/admin/users`, add `admin` accounts (temp passwords shown once; they change on first login). + +--- + +## Self-Review (run by the plan author after writing — recorded here) + +**Spec coverage:** OperatorUser table (T2) ✓ · auth_cookies shared signer (T1) ✓ · `tv_session` cookie + 30-day + DB re-validation + sessions_valid_from (T3) ✓ · deny-by-default middleware + exempt list incl. 3 machine endpoints + must_change redirect + HTML-303/api-401 split (T5) ✓ · role ladder + require_role superadmin-only (T2/T5/T7) ✓ · authenticate + lockout 5/15min (T4/T6) ✓ · password reset all three paths + forgot=contact-admin (T4/T6/T7) ✓ · seed/break-glass CLI (T8) ✓ · user-mgmt UI (T7) ✓ · login/logout/change-password + templates (T6) ✓ · flag governs middleware AND require_role (T5) ✓ · rollout sequence (T10 + manual) ✓ · no migration / new table auto-creates (T2) ✓ · CHANGELOG (T10) ✓. + +**Type/name consistency:** `make_operator_cookie(uid, iat=None)`, `current_operator(request, db)`, `authenticate(db, email, password) -> (user, status)`, `create_operator(db, email, name, role, password=None, must_change=None) -> (user, raw)`, `reset_operator_password(db, user) -> raw`, `change_own_password(db, user, new_password) -> new_iat`, `role_at_least(role, minimum)`, `require_role(minimum)`, `COOKIE_NAME="tv_session"` — all referenced consistently across T3–T8. The `sessions_valid_from`-vs-`iat` truncation race is handled at every write point (`_utcnow_seconds` default, `utcfromtimestamp(new_iat)` on change, `.replace(microsecond=0)` on reset) and covered by `test_cookie_minted_with_matching_iat_after_bump_still_valid`. + +**Placeholder scan:** none — every code step is complete.