74 KiB
Operator Authentication Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a login + roles to the internal Terra-View app (which today has zero auth), gated behind a OPERATOR_AUTH_ENABLED feature flag, with a dead-simple superadmin-driven password-reset story.
Architecture: One deny-by-default Starlette HTTP middleware gates every route except an explicit allow-list (login/logout/health/static/portal + 3 machine endpoints). It resolves an OperatorUser from a signed tv_session cookie (re-validated against the DB every request) and stashes it on request.state.operator; a require_role() dependency reads it for the superadmin-only user-management routes. Reuses the portal's argon2 hasher and HMAC-signed-cookie pattern. The flag governs both the middleware and require_role, so shipping with it off behaves exactly like today.
Tech Stack: FastAPI + Starlette middleware, SQLAlchemy/SQLite, argon2-cffi (already a dep), stdlib hmac/hashlib, Jinja2, pytest (existing harness).
Spec: docs/superpowers/specs/2026-06-17-operator-auth-design.md
File Structure
| File | Responsibility |
|---|---|
backend/auth_cookies.py (new) |
generic sign(payload) / read(raw, max_age) signer + SECRET_KEY / COOKIE_SECURE |
backend/models.py (modify) |
add OperatorUser table + _utcnow_seconds helper |
backend/operator_auth.py (new) |
flag, role ranks, cookie helpers, current_operator, authenticate, lockout, data helpers (create/reset/role/active/change), the gate middleware, require_role |
backend/routers/operator_auth_routes.py (new) |
/login, /logout, /change-password |
backend/routers/operator_users.py (new) |
/admin/users page + CRUD JSON (superadmin) |
backend/operator_admin.py (new) |
seed / break-glass CLI |
backend/main.py (modify) |
register the gate middleware + the two routers |
templates/login.html, templates/change_password.html (new) |
standalone (no nav) auth pages |
templates/admin/users.html (new) |
user-management page (extends base.html) |
tests/conftest.py (modify) |
add wire_operator_auth() helper |
tests/test_operator_*.py (new) |
per-task test files |
Conventions discovered (follow these exactly):
- UUID PKs:
id = Column(String, primary_key=True, index=True)set viastr(uuid.uuid4()). - argon2 helpers live in
backend/auth_passwords.py:hash_password(raw),verify_password(raw, hashed)(never raises),generate_password(n_bytes=12). - Tests run in the dev container:
docker exec terra-view-terra-view-1 python -m pytest tests/ -v. Source is bind-mounted, so no rebuild between edits. - The gate middleware reads the DB through a module-level
SessionLocal(same pattern the portal WS handler uses) so tests can monkeypatch it.db_session.get_bind()returns the test engine — bind asessionmakerto it. - The flag and
SessionLocalare read as module globals at call time, somonkeypatch.setattr(backend.operator_auth, "...", ...)works without re-importing.
Task 1: Generic signed-cookie module (auth_cookies.py)
Lift the portal's HMAC signer into a shared, payload-agnostic module the operator auth uses now (the portal keeps its own helpers untouched).
Files:
-
Create:
backend/auth_cookies.py -
Test:
tests/test_operator_cookies.py -
Step 1: Write the failing test
# tests/test_operator_cookies.py
import time
from backend.auth_cookies import sign, read
def test_sign_then_read_round_trips():
raw = sign({"uid": "abc", "iat": 1000})
data = read(raw, max_age=3600)
assert data == {"uid": "abc", "iat": 1000}
def test_tampered_signature_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time())})
body, _sig = raw.rsplit(".", 1)
assert read(body + ".deadbeef", max_age=3600) is None
def test_tampered_body_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time())})
body, sig = raw.rsplit(".", 1)
import base64, json
forged = base64.urlsafe_b64encode(json.dumps({"uid": "evil", "iat": int(time.time())}).encode()).decode()
assert read(forged + "." + sig, max_age=3600) is None
def test_expired_by_iat_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time()) - 10_000})
assert read(raw, max_age=3600) is None
def test_garbage_input_is_none_not_raise():
assert read("not-a-cookie", max_age=3600) is None
assert read("", max_age=3600) is None
assert read(None, max_age=3600) is None
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'backend.auth_cookies'
- Step 3: Write the implementation
# backend/auth_cookies.py
"""Generic HMAC-signed cookie payloads, shared by operator auth (and, optionally
later, the portal). A signed value is f"{b64url(json)}.{hmac_sha256(b64)}"; read()
verifies the signature in constant time and enforces a server-side iat expiry.
The signing secret is the same SECRET_KEY the portal already reads, so a single
env var protects both cookies. Never store or log raw secrets."""
import os
import hmac
import json
import time
import base64
import hashlib
import logging
logger = logging.getLogger(__name__)
# Same env var the portal cookie uses — one secret protects both. The insecure
# default only exists so dev/test boots without config; set a real SECRET_KEY in prod.
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me")
# Set COOKIE_SECURE=true once served over HTTPS; leave false on plain HTTP or the
# browser won't send the cookie.
COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false").lower() in ("1", "true", "yes")
def _sign(body: str) -> str:
return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest()
def sign(payload: dict) -> str:
"""Serialize + sign a payload dict into a cookie-safe string."""
body = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()
return f"{body}.{_sign(body)}"
def read(raw, max_age: int):
"""Verify a signed value and return its payload dict, or None if missing,
tampered, or older than max_age seconds (by its own `iat`)."""
if not raw or not isinstance(raw, str):
return None
try:
body, sig = raw.rsplit(".", 1)
except (ValueError, AttributeError):
return None
if not hmac.compare_digest(sig, _sign(body)):
return None
try:
data = json.loads(base64.urlsafe_b64decode(body.encode()))
except Exception:
return None
if not isinstance(data, dict):
return None
iat = data.get("iat")
if not isinstance(iat, (int, float)) or (time.time() - iat) > max_age:
return None
return data
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_cookies.py -v
Expected: PASS (5 passed)
- Step 5: Commit
git add backend/auth_cookies.py tests/test_operator_cookies.py
git commit -m "feat(auth): generic HMAC signed-cookie module for operator auth"
Task 2: OperatorUser model + role-rank helpers
The new table (auto-created by Base.metadata.create_all, no migration) and the role ladder.
Files:
-
Modify:
backend/models.py(add_utcnow_secondsnear the top imports, andOperatorUserat end of file) -
Create:
backend/operator_auth.py(start it with constants +role_at_least) -
Test:
tests/test_operator_model.py -
Step 1: Write the failing test
# tests/test_operator_model.py
import uuid
from backend.models import OperatorUser
from backend.operator_auth import role_at_least, _ROLE_RANK
def test_operator_user_defaults(db_session):
u = OperatorUser(id=str(uuid.uuid4()), email="a@x.com", display_name="A",
password_hash="h", role="admin")
db_session.add(u)
db_session.commit()
got = db_session.query(OperatorUser).filter_by(email="a@x.com").first()
assert got.active is True
assert got.must_change_password is False
assert got.failed_login_count == 0
assert got.locked_until is None
assert got.sessions_valid_from is not None
assert got.sessions_valid_from.microsecond == 0 # truncated to whole seconds
def test_email_is_unique(db_session):
for i in range(2):
db_session.add(OperatorUser(id=str(uuid.uuid4()), email="dup@x.com",
display_name="d", password_hash="h", role="admin"))
import pytest
with pytest.raises(Exception):
db_session.commit()
def test_role_ladder():
assert _ROLE_RANK == {"operator": 10, "admin": 20, "superadmin": 30}
assert role_at_least("superadmin", "admin") is True
assert role_at_least("admin", "admin") is True
assert role_at_least("admin", "superadmin") is False
assert role_at_least("operator", "admin") is False
assert role_at_least("nonsense", "admin") is False
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v
Expected: FAIL — ImportError: cannot import name 'OperatorUser'
- Step 3a: Add the
_utcnow_secondshelper tobackend/models.py
Insert directly after the existing from datetime import datetime line (line 2) and before from backend.database import Base:
def _utcnow_seconds():
"""utcnow truncated to whole seconds — used as the default for
sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second
epoch) never falls a few microseconds before it and self-invalidates."""
return datetime.utcnow().replace(microsecond=0)
- Step 3b: Append the
OperatorUsermodel at the end ofbackend/models.py
class OperatorUser(Base):
"""An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and
'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table
→ create_all builds it, no migration. Never store or log raw passwords."""
__tablename__ = "operator_users"
id = Column(String, primary_key=True, index=True) # UUID
email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased
display_name = Column(String, nullable=False) # "Brian", "Dad"
password_hash = Column(String, nullable=False) # argon2id
role = Column(String, nullable=False, default="admin") # superadmin | admin
active = Column(Boolean, default=True) # False = login disabled
must_change_password = Column(Boolean, default=False) # forces a change next login
sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere
failed_login_count = Column(Integer, default=0) # lockout counter
locked_until = Column(DateTime, nullable=True) # set after too many bad tries
created_at = Column(DateTime, default=datetime.utcnow)
last_login_at = Column(DateTime, nullable=True)
- Step 3c: Create
backend/operator_auth.pywith constants +role_at_least
# backend/operator_auth.py
"""Operator authentication: the deny-by-default gate, session cookie, login +
lockout, and the small data helpers shared by the routes and the CLI. Reuses the
argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies).
The flag and SessionLocal are read as module globals at call time so tests can
monkeypatch them."""
import os
import time
import uuid
from datetime import datetime, timedelta
from backend.models import OperatorUser
from backend.auth_passwords import hash_password, verify_password, generate_password
# Feature flag — OFF by default. When off, the gate and require_role both pass
# everything through and the app behaves exactly as it does today.
OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes")
COOKIE_NAME = "tv_session"
COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device")
MAX_LOGIN_FAILURES = 5
LOCK_MINUTES = 15
# Role ladder — a rank map so checks read naturally and 'operator' slots in later.
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
def role_at_least(role: str, minimum: str) -> bool:
"""True iff `role` ranks at or above `minimum`. Unknown roles rank as 0."""
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum]
def _norm_email(email: str) -> str:
return (email or "").strip().lower()
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_model.py -v
Expected: PASS (3 passed)
- Step 5: Commit
git add backend/models.py backend/operator_auth.py tests/test_operator_model.py
git commit -m "feat(auth): OperatorUser model + role ladder"
Task 3: Session cookie helpers + current_operator
Mint/validate the tv_session cookie and resolve the operator from a request, re-validating against the DB every call.
Files:
-
Modify:
backend/operator_auth.py(append helpers) -
Test:
tests/test_operator_session.py -
Step 1: Write the failing test
# tests/test_operator_session.py
import time
import uuid
from datetime import datetime, timedelta
from types import SimpleNamespace
from backend.models import OperatorUser
from backend.operator_auth import (
make_operator_cookie, current_operator, COOKIE_NAME,
)
def _make_user(db, **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw)
db.add(u)
db.commit()
return u
def _req(cookie_value):
# current_operator only reads request.cookies — a stub is enough.
return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {})
def test_valid_cookie_resolves_user(db_session):
u = _make_user(db_session)
cookie = make_operator_cookie(u.id)
assert current_operator(_req(cookie), db_session).id == u.id
def test_no_or_garbage_cookie_is_none(db_session):
assert current_operator(_req(None), db_session) is None
assert current_operator(_req("garbage"), db_session) is None
def test_inactive_user_is_none(db_session):
u = _make_user(db_session, active=False)
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_locked_user_is_none(db_session):
u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5))
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_cookie_older_than_sessions_valid_from_is_none(db_session):
u = _make_user(db_session)
old_iat = int(time.time()) - 1000
cookie = make_operator_cookie(u.id, iat=old_iat)
u.sessions_valid_from = datetime.utcnow()
db_session.commit()
assert current_operator(_req(cookie), db_session) is None
def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session):
# Guards the change-password race: bump sessions_valid_from to the new cookie's
# exact iat → that fresh cookie must remain valid.
u = _make_user(db_session)
new_iat = int(time.time())
u.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db_session.commit()
assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v
Expected: FAIL — ImportError: cannot import name 'make_operator_cookie'
- Step 3: Append the helpers to
backend/operator_auth.py
from backend.auth_cookies import sign, read, COOKIE_SECURE # add to the import block at top
def make_operator_cookie(uid: str, iat: int = None) -> str:
"""Sign a tv_session value for a user id. iat defaults to now; pass an explicit
iat when you bump sessions_valid_from to that same instant (change-password)."""
return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())})
def current_operator(request, db):
"""Resolve the OperatorUser for a request's tv_session cookie, or None.
Re-validated against the DB every call: a disabled / locked / password-changed
user drops on the next request. Used by the gate middleware (with its own
session) — does not raise."""
data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE)
if not data:
return None
uid, iat = data.get("uid"), data.get("iat")
if not uid or not isinstance(iat, (int, float)):
return None
user = db.query(OperatorUser).filter_by(id=uid).first()
if not user or not user.active:
return None
if user.locked_until and user.locked_until > datetime.utcnow():
return None
if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from:
return None
return user
NOTE: move the from backend.auth_cookies import sign, read, COOKIE_SECURE line up into the import block at the top of the file (shown inline here only to mark the dependency). COOKIE_SECURE is imported now because later tasks (login route) set the cookie with it.
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_session.py -v
Expected: PASS (6 passed)
- Step 5: Commit
git add backend/operator_auth.py tests/test_operator_session.py
git commit -m "feat(auth): operator session cookie + current_operator DB re-validation"
Task 4: authenticate + lockout + data helpers
Login verification with brute-force lockout, plus the create/reset/role/active/change helpers shared by the routes and the CLI.
Files:
-
Modify:
backend/operator_auth.py(append) -
Test:
tests/test_operator_authenticate.py -
Step 1: Write the failing test
# tests/test_operator_authenticate.py
import time
from datetime import datetime
import pytest
from backend.operator_auth import (
authenticate, create_operator, reset_operator_password,
set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES,
)
from backend.auth_passwords import verify_password
from backend.models import OperatorUser
def test_create_operator_generates_temp_and_forces_change(db_session):
user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin")
assert user.email == "dad@x.com" # lowercased
assert user.must_change_password is True
assert verify_password(raw, user.password_hash)
def test_create_operator_with_explicit_password_no_forced_change(db_session):
user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123")
assert raw == "chosen-pw-123"
assert user.must_change_password is False
def test_create_operator_rejects_duplicate_and_bad_role(db_session):
create_operator(db_session, "a@x.com", "A", "admin")
with pytest.raises(ValueError):
create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive)
with pytest.raises(ValueError):
create_operator(db_session, "b@x.com", "B", "wizard") # bad role
def test_authenticate_success(db_session):
user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
got, status = authenticate(db_session, "OK@x.com", "rightpw-9")
assert status == "ok" and got.id == user.id
assert got.last_login_at is not None
assert got.failed_login_count == 0
def test_authenticate_wrong_password_counts(db_session):
create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9")
got, status = authenticate(db_session, "wp@x.com", "nope")
assert got is None and status == "bad"
assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1
def test_lockout_after_five_then_correct_password_refused(db_session):
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
for _ in range(MAX_LOGIN_FAILURES):
authenticate(db_session, "lk@x.com", "nope")
got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked
assert got is None and status == "locked"
def test_authenticate_unknown_email_is_bad_not_error(db_session):
got, status = authenticate(db_session, "ghost@x.com", "whatever")
assert got is None and status == "bad"
def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session):
user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1")
before = user.sessions_valid_from
raw = reset_operator_password(db_session, user)
assert verify_password(raw, user.password_hash)
assert user.must_change_password is True
assert user.sessions_valid_from >= before
def test_change_own_password_clears_flag_and_bumps(db_session):
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
user.must_change_password = True
db_session.commit()
new_iat = change_own_password(db_session, user, "brand-new-pw-2")
assert verify_password("brand-new-pw-2", user.password_hash)
assert user.must_change_password is False
assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat)
def test_set_active_and_role(db_session):
user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1")
set_operator_active(db_session, user, False)
assert user.active is False
set_operator_role(db_session, user, "superadmin")
assert user.role == "superadmin"
with pytest.raises(ValueError):
set_operator_role(db_session, user, "wizard")
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v
Expected: FAIL — ImportError: cannot import name 'authenticate'
- Step 3: Append the helpers to
backend/operator_auth.py
def register_login_failure(db, user) -> None:
"""Increment a user's failure counter and lock them out past the threshold."""
user.failed_login_count = (user.failed_login_count or 0) + 1
if user.failed_login_count >= MAX_LOGIN_FAILURES:
user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES)
db.commit()
def authenticate(db, email, password):
"""Return (user, "ok") on success, (None, "locked") if locked out, else
(None, "bad"). Never reveals whether the email exists (generic 'bad')."""
user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if user and user.locked_until and user.locked_until > datetime.utcnow():
return None, "locked"
if not user or not user.active or not verify_password(password, user.password_hash):
if user:
register_login_failure(db, user)
return None, "bad"
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.utcnow()
db.commit()
return user, "ok"
def create_operator(db, email, name, role, password=None, must_change=None):
"""Create an operator. With no password, generate a temp one and force a change
(must_change defaults True). With a password, must_change defaults False.
Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role."""
email = _norm_email(email)
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
if db.query(OperatorUser).filter_by(email=email).first():
raise ValueError(f"operator {email} already exists")
if password is None:
password = generate_password()
if must_change is None:
must_change = True
elif must_change is None:
must_change = False
user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name,
password_hash=hash_password(password), role=role,
active=True, must_change_password=must_change)
db.add(user)
db.commit()
return user, password
def reset_operator_password(db, user) -> str:
"""Generate a fresh temp password, force a change, log the user out everywhere.
Returns the raw password to show once."""
raw = generate_password()
user.password_hash = hash_password(raw)
user.must_change_password = True
user.failed_login_count = 0
user.locked_until = None
user.sessions_valid_from = datetime.utcnow().replace(microsecond=0)
db.commit()
return raw
def change_own_password(db, user, new_password) -> int:
"""Set a user's own new password, clear the forced-change flag, and bump
sessions_valid_from to the returned iat — the caller mints the replacement
cookie with that exact iat so it stays valid while older cookies die."""
new_iat = int(time.time())
user.password_hash = hash_password(new_password)
user.must_change_password = False
user.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db.commit()
return new_iat
def set_operator_active(db, user, active: bool):
user.active = bool(active)
db.commit()
return user
def set_operator_role(db, user, role: str):
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
user.role = role
db.commit()
return user
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_authenticate.py -v
Expected: PASS (10 passed)
- Step 5: Commit
git add backend/operator_auth.py tests/test_operator_authenticate.py
git commit -m "feat(auth): authenticate + lockout + operator data helpers"
Task 5: Gate middleware + require_role + wire into the app
The deny-by-default Starlette middleware and the role dependency. Registering the middleware with the flag defaulting OFF keeps every existing test green.
Files:
-
Modify:
backend/operator_auth.py(appendoperator_gate+require_role) -
Modify:
backend/main.py(register the middleware) -
Modify:
tests/conftest.py(addwire_operator_authhelper) -
Test:
tests/test_operator_gate.py -
Step 1: Add the
wire_operator_authhelper totests/conftest.py
Append at the end of tests/conftest.py:
def wire_operator_auth(monkeypatch, db_session, enabled=True):
"""Point the gate middleware's SessionLocal at the test engine and flip the
flag. The middleware opens its OWN session (it can't use the get_db override),
so it must read the same engine the test writes to."""
import backend.operator_auth as oa
from sqlalchemy.orm import sessionmaker
maker = sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False)
monkeypatch.setattr(oa, "SessionLocal", maker, raising=False)
monkeypatch.setattr(oa, "OPERATOR_AUTH_ENABLED", enabled, raising=False)
return oa
- Step 2: Write the failing test
# tests/test_operator_gate.py
import uuid
from tests.conftest import wire_operator_auth
from backend.models import OperatorUser
from backend.operator_auth import make_operator_cookie, COOKIE_NAME
from backend.auth_passwords import hash_password
def _make_user(db, role="admin", **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash=hash_password("pw"), role=role, **kw)
db.add(u)
db.commit()
return u
def test_flag_off_passes_everything(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=False)
assert client.get("/", follow_redirects=False).status_code == 200
def test_gated_html_redirects_to_login_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"].startswith("/login?next=")
def test_gated_api_returns_401_json_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/api/status-snapshot", follow_redirects=False)
assert r.status_code == 401
def test_valid_session_passes(client, db_session, monkeypatch):
u = _make_user(db_session)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
assert client.get("/", follow_redirects=False).status_code == 200
def test_must_change_password_user_routed_to_change_password(client, db_session, monkeypatch):
u = _make_user(db_session, must_change_password=True)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_exempt_paths_pass_without_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
assert client.get("/health", follow_redirects=False).status_code == 200
assert client.get("/login", follow_redirects=False).status_code == 200
def test_portal_paths_are_exempt(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# /portal/p/<bad> hits the portal's own gate (403/404), never the operator login.
r = client.get("/portal/p/nope", follow_redirects=False)
assert r.status_code in (403, 404)
- Step 3: Append
operator_gate+require_roletobackend/operator_auth.py
Add these imports to the top of the file:
from urllib.parse import quote
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse
from backend.database import SessionLocal
Then append:
# Routes reachable with no login. A new route added next year is gated by default.
_EXEMPT_EXACT = {
"/login", "/logout", "/health",
"/manifest.json", "/sw.js", "/favicon.ico", "/offline-db.js",
"/portal", # portal home (its own auth)
# machine endpoints — LAN-only, automated, no human (watchers/heartbeats):
"/emitters/report", "/api/series3/heartbeat", "/api/series4/heartbeat",
}
_EXEMPT_PREFIX = ("/static/", "/portal/")
def _is_exempt(path: str) -> bool:
return path in _EXEMPT_EXACT or path.startswith(_EXEMPT_PREFIX)
async def operator_gate(request: Request, call_next):
"""Deny-by-default gate. Flag off → pass through (app as today). Flag on →
exempt paths pass; otherwise require a valid operator session, stash it on
request.state.operator, and force a password change when pending."""
if not OPERATOR_AUTH_ENABLED:
return await call_next(request)
path = request.url.path
if _is_exempt(path):
return await call_next(request)
db = SessionLocal()
try:
user = current_operator(request, db)
if user is not None:
db.expunge(user) # detach a fully-loaded row so we can close now
finally:
db.close()
if user is None:
if path.startswith("/api/"):
return JSONResponse({"detail": "Not authenticated"}, status_code=401)
return RedirectResponse(f"/login?next={quote(path)}", status_code=303)
if user.must_change_password and path not in ("/change-password", "/logout"):
return RedirectResponse("/change-password", status_code=303)
request.state.operator = user
return await call_next(request)
def require_role(minimum: str):
"""Dependency factory: require a logged-in operator ranked >= `minimum`.
Respects the flag (off → pass through). When on, the middleware has already
set request.state.operator before this runs."""
def _dep(request: Request):
if not OPERATOR_AUTH_ENABLED:
return None
user = getattr(request.state, "operator", None)
if user is None:
raise HTTPException(status_code=401, detail="Not authenticated")
if not role_at_least(user.role, minimum):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return _dep
- Step 4: Register the middleware in
backend/main.py
After the existing add_environment_to_context middleware block (ends line 90), add:
# Operator auth — deny-by-default gate over the whole internal app. Governed by
# OPERATOR_AUTH_ENABLED (default off → behaves exactly as today). See
# docs/superpowers/specs/2026-06-17-operator-auth-design.md.
from backend.operator_auth import operator_gate
app.middleware("http")(operator_gate)
- Step 5: Run tests to verify the gate works AND nothing regressed
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_gate.py tests/test_portal_gate.py -v
Expected: PASS (gate tests pass; the 6 existing portal-gate tests still pass — flag defaults off)
- Step 6: Commit
git add backend/operator_auth.py backend/main.py tests/conftest.py tests/test_operator_gate.py
git commit -m "feat(auth): deny-by-default gate middleware + require_role"
Task 6: Login / logout / change-password routes + templates
The auth pages. These routes work regardless of the flag (you log in while the flag is still off during rollout) and are on the gate's exempt list (login/logout) — except /change-password, which requires a session.
Files:
-
Create:
backend/routers/operator_auth_routes.py -
Create:
templates/login.html,templates/change_password.html -
Modify:
backend/main.py(include the router) -
Test:
tests/test_operator_login.py -
Step 1: Write the failing test
# tests/test_operator_login.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import (
create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES,
)
def test_login_page_renders(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/login")
assert r.status_code == 200
assert "password" in r.text.lower()
def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/"
assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "")
def test_login_honors_next(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.headers["location"] == "/settings"
def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "nope"},
follow_redirects=False)
assert r.status_code == 200
assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "")
assert "invalid" in r.text.lower()
def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch):
create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change
# fetch the raw temp by resetting to a known one
user = None
from backend.models import OperatorUser
user = db_session.query(OperatorUser).filter_by(email="new@x.com").first()
from backend.auth_passwords import hash_password
user.password_hash = hash_password("temp-pw-1")
db_session.commit()
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_login_lockout_message_after_five(client, db_session, monkeypatch):
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
for _ in range(MAX_LOGIN_FAILURES):
client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False)
r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False)
assert r.status_code == 200
assert "too many" in r.text.lower()
def test_logout_clears_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/logout", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/login"
# cookie cleared (deletion appears as an expired/empty set-cookie)
assert COOKIE_NAME in r.headers.get("set-cookie", "")
def test_change_password_self_service(client, db_session, monkeypatch):
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
r = client.post("/change-password",
data={"current_password": "orig-pw-1", "new_password": "brand-new-2",
"confirm_password": "brand-new-2"}, follow_redirects=False)
assert r.status_code == 303
from backend.auth_passwords import verify_password
db_session.refresh(user)
assert verify_password("brand-new-2", user.password_hash)
assert user.must_change_password is False
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v
Expected: FAIL — /login returns 404 / 303 (route doesn't exist yet)
- Step 3a: Create
templates/login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sign in · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-6 text-center">Terra-View</h1>
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/login{% if next %}?next={{ next }}{% endif %}" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="email">Email</label>
<input id="email" name="email" type="email" autofocus required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="password">Password</label>
<input id="password" name="password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Sign in</button>
</form>
<p class="mt-4 text-xs text-slate-400 text-center">Forgot your password? Contact your administrator.</p>
</div>
</body>
</html>
- Step 3b: Create
templates/change_password.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Change password · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-2 text-center">Change your password</h1>
{% if must_change %}
<p class="mb-4 text-sm text-amber-300 text-center">Please set a new password to continue.</p>
{% endif %}
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/change-password" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="current_password">Current password</label>
<input id="current_password" name="current_password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="new_password">New password</label>
<input id="new_password" name="new_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="confirm_password">Confirm new password</label>
<input id="confirm_password" name="confirm_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Update password</button>
</form>
</div>
</body>
</html>
- Step 3c: Create
backend/routers/operator_auth_routes.py
"""Operator login / logout / change-password. These routes intentionally work
regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during
rollout). /login and /logout are on the gate's exempt list; /change-password
requires a session (the gate sets request.state.operator)."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
from backend.operator_auth import (
authenticate, current_operator, change_own_password, make_operator_cookie,
COOKIE_NAME, COOKIE_MAX_AGE,
)
from backend.auth_cookies import COOKIE_SECURE
from backend.auth_passwords import verify_password
router = APIRouter(tags=["operator-auth"])
def _safe_next(next_url: str) -> str:
"""Only allow same-site relative redirects (an open-redirect guard)."""
if next_url and next_url.startswith("/") and not next_url.startswith("//"):
return next_url
return "/"
@router.get("/login")
async def login_page(request: Request, next: str = "", error: str = ""):
return templates.TemplateResponse("login.html",
{"request": request, "next": next, "error": error})
@router.post("/login")
async def login_submit(request: Request, next: str = "",
email: str = Form(...), password: str = Form(...),
db: Session = Depends(get_db)):
user, status = authenticate(db, email, password)
if status == "locked":
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next,
"error": "Too many attempts — try again in 15 minutes."},
status_code=200)
if user is None:
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next, "error": "Invalid email or password."},
status_code=200)
dest = "/change-password" if user.must_change_password else _safe_next(next)
resp = RedirectResponse(url=dest, status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
@router.get("/logout")
async def logout(request: Request):
resp = RedirectResponse(url="/login", status_code=303)
resp.delete_cookie(COOKIE_NAME)
return resp
@router.get("/change-password")
async def change_password_page(request: Request, db: Session = Depends(get_db)):
user = getattr(request.state, "operator", None) or current_operator(request, db)
if user is None:
return RedirectResponse(url="/login", status_code=303)
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": ""})
@router.post("/change-password")
async def change_password_submit(request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
db: Session = Depends(get_db)):
user = getattr(request.state, "operator", None) or current_operator(request, db)
if user is None:
return RedirectResponse(url="/login", status_code=303)
def _err(msg):
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": msg},
status_code=200)
if not verify_password(current_password, user.password_hash):
return _err("Current password is incorrect.")
if len(new_password) < 8:
return _err("New password must be at least 8 characters.")
if new_password != confirm_password:
return _err("New passwords do not match.")
new_iat = change_own_password(db, user, new_password)
resp = RedirectResponse(url="/", status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
- Step 3d: Register the router in
backend/main.py
After the middleware registration added in Task 5, add:
from backend.routers import operator_auth_routes
app.include_router(operator_auth_routes.router)
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_login.py -v
Expected: PASS (8 passed)
- Step 5: Commit
git add backend/routers/operator_auth_routes.py templates/login.html templates/change_password.html backend/main.py tests/test_operator_login.py
git commit -m "feat(auth): login/logout/change-password routes + pages"
Task 7: User-management routes + template (superadmin-only)
/admin/users page and JSON CRUD, all behind require_role("superadmin"). Temp passwords are returned once.
Files:
-
Create:
backend/routers/operator_users.py -
Create:
templates/admin/users.html -
Modify:
backend/main.py(include the router) -
Test:
tests/test_operator_users.py -
Step 1: Write the failing test
# tests/test_operator_users.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME
from backend.models import OperatorUser
def _login_as(client, user):
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
def test_admin_cannot_reach_user_management(client, db_session, monkeypatch):
admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, admin)
assert client.get("/admin/users", follow_redirects=False).status_code == 403
def test_superadmin_sees_user_management(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.get("/admin/users", follow_redirects=False).status_code == 200
def test_superadmin_lists_users_json(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.get("/api/admin/users")
assert r.status_code == 200
emails = [u["email"] for u in r.json()["users"]]
assert "su@x.com" in emails
assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes
def test_create_user_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post("/api/admin/users",
json={"email": "dad@x.com", "name": "Dad", "role": "admin"})
assert r.status_code == 200
assert len(r.json()["password"]) >= 12
made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first()
assert made.must_change_password is True
def test_reset_password_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/reset-password")
assert r.status_code == 200 and len(r.json()["password"]) >= 12
db_session.refresh(target)
assert target.must_change_password is True
def test_disable_and_enable(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200
db_session.refresh(target); assert target.active is False
assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200
db_session.refresh(target); assert target.active is True
def test_change_role(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"})
assert r.status_code == 200
db_session.refresh(target); assert target.role == "superadmin"
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v
Expected: FAIL — routes don't exist (404)
- Step 3a: Create
templates/admin/users.html
{% extends "base.html" %}
{% block title %}Operator Accounts{% endblock %}
{% block content %}
<div class="max-w-4xl mx-auto p-4">
<div class="flex items-center justify-between mb-4">
<h1 class="text-2xl font-semibold">Operator Accounts</h1>
<button id="add-user-btn" class="px-3 py-2 rounded bg-orange-500 hover:bg-orange-600 text-white text-sm">+ Add operator</button>
</div>
<div id="temp-pw-banner" class="hidden mb-4 px-3 py-2 rounded bg-emerald-900/60 text-emerald-100 text-sm"></div>
<table class="w-full text-sm">
<thead><tr class="text-left border-b border-slate-600">
<th class="py-2">Name</th><th>Email</th><th>Role</th><th>Status</th><th>Last login</th><th></th>
</tr></thead>
<tbody id="user-rows"></tbody>
</table>
</div>
<script>
const $ = (s) => document.querySelector(s);
function showTemp(email, pw) {
const b = $("#temp-pw-banner");
b.textContent = `Temporary password for ${email}: ${pw} — copy it now, it won't be shown again.`;
b.classList.remove("hidden");
}
async function load() {
const r = await fetch("/api/admin/users");
const { users } = await r.json();
$("#user-rows").innerHTML = users.map(u => `
<tr class="border-b border-slate-700">
<td class="py-2">${u.display_name}</td>
<td>${u.email}</td>
<td>
<select data-role="${u.id}" class="bg-slate-700 rounded px-1 py-0.5">
<option value="admin"${u.role==='admin'?' selected':''}>admin</option>
<option value="superadmin"${u.role==='superadmin'?' selected':''}>superadmin</option>
</select>
</td>
<td>${u.active ? 'active' : 'disabled'}${u.locked ? ' (locked)' : ''}</td>
<td>${u.last_login_at || '—'}</td>
<td class="text-right space-x-2">
<button data-reset="${u.id}" data-email="${u.email}" class="text-orange-400 hover:underline">Reset pw</button>
<button data-toggle="${u.id}" data-active="${u.active}" class="text-slate-300 hover:underline">${u.active ? 'Disable' : 'Enable'}</button>
</td>
</tr>`).join("");
}
document.addEventListener("click", async (e) => {
if (e.target.dataset.reset) {
const r = await fetch(`/api/admin/users/${e.target.dataset.reset}/reset-password`, {method:"POST"});
const d = await r.json(); showTemp(e.target.dataset.email, d.password); load();
} else if (e.target.dataset.toggle) {
const action = e.target.dataset.active === "true" ? "disable" : "enable";
await fetch(`/api/admin/users/${e.target.dataset.toggle}/${action}`, {method:"POST"}); load();
} else if (e.target.id === "add-user-btn") {
const email = prompt("Email?"); if (!email) return;
const name = prompt("Display name?") || email;
const role = prompt("Role (admin / superadmin)?", "admin") || "admin";
const r = await fetch("/api/admin/users", {method:"POST", headers:{"Content-Type":"application/json"},
body: JSON.stringify({email, name, role})});
if (r.ok) { const d = await r.json(); showTemp(email, d.password); load(); }
else { alert((await r.json()).detail || "Failed"); }
}
});
document.addEventListener("change", async (e) => {
if (e.target.dataset.role) {
await fetch(`/api/admin/users/${e.target.dataset.role}/role`, {method:"POST",
headers:{"Content-Type":"application/json"}, body: JSON.stringify({role: e.target.value})});
load();
}
});
load();
</script>
{% endblock %}
- Step 3b: Create
backend/routers/operator_users.py
"""Operator account management — superadmin only. Temp passwords are returned in
the JSON response once (shown to the superadmin to hand off); only hashes persist."""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
from backend.models import OperatorUser
from backend.operator_auth import (
require_role, create_operator, reset_operator_password,
set_operator_active, set_operator_role,
)
from backend.utils.timezone import format_local_datetime
router = APIRouter(tags=["operator-users"])
_superadmin = require_role("superadmin")
class NewUser(BaseModel):
email: str
name: str
role: str = "admin"
class RoleChange(BaseModel):
role: str
def _serialize(u: OperatorUser) -> dict:
from datetime import datetime
return {
"id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role,
"active": bool(u.active), "must_change_password": bool(u.must_change_password),
"locked": bool(u.locked_until and u.locked_until > datetime.utcnow()),
"last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None,
}
@router.get("/admin/users")
async def users_page(request: Request, _=Depends(_superadmin)):
return templates.TemplateResponse("admin/users.html", {"request": request})
@router.get("/api/admin/users")
async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)):
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
return {"users": [_serialize(u) for u in users]}
@router.post("/api/admin/users")
async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)):
try:
user, raw = create_operator(db, body.email, body.name, body.role)
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
return {"user": _serialize(user), "password": raw}
@router.post("/api/admin/users/{user_id}/reset-password")
async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
raw = reset_operator_password(db, user)
return {"password": raw}
@router.post("/api/admin/users/{user_id}/disable")
async def disable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, False)
return {"active": False}
@router.post("/api/admin/users/{user_id}/enable")
async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, True)
return {"active": True}
@router.post("/api/admin/users/{user_id}/role")
async def change_user_role(user_id: str, body: RoleChange,
_=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
try:
set_operator_role(db, user, body.role)
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
return {"role": user.role}
- Step 3c: Register the router in
backend/main.py
After the operator_auth_routes include added in Task 6, add:
from backend.routers import operator_users
app.include_router(operator_users.router)
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_users.py -v
Expected: PASS (7 passed)
- Step 5: Commit
git add backend/routers/operator_users.py templates/admin/users.html backend/main.py tests/test_operator_users.py
git commit -m "feat(auth): superadmin user-management page + CRUD"
Task 8: Seed / break-glass CLI (operator_admin.py)
The bootstrap (first superadmin, before any UI is reachable) and the break-glass (locked out / forgot everything). Thin wrappers over the Task-4 data helpers.
Files:
-
Create:
backend/operator_admin.py -
Test:
tests/test_operator_admin_cli.py -
Step 1: Write the failing test
# tests/test_operator_admin_cli.py
from sqlalchemy.orm import sessionmaker
from backend.models import OperatorUser
from backend.auth_passwords import verify_password
import backend.operator_admin as cli
def _maker(db_session):
return sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False)
def test_seed_superadmin(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_superadmin(email="brian@x.com", name="Brian", password="chosen-pw-1")
u = db_session.query(OperatorUser).filter_by(email="brian@x.com").first()
assert u.role == "superadmin"
assert u.must_change_password is False
assert verify_password("chosen-pw-1", u.password_hash)
def test_create_user_generates_temp(db_session, monkeypatch, capsys):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="dad@x.com", name="Dad", role="admin")
u = db_session.query(OperatorUser).filter_by(email="dad@x.com").first()
assert u.role == "admin" and u.must_change_password is True
assert "dad@x.com" in capsys.readouterr().out # prints the temp once
def test_reset_password_cli(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="r@x.com", name="R", role="admin")
before = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash
cli.cmd_reset_password(email="r@x.com")
after = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash
assert before != after
def test_disable_enable_cli(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="d@x.com", name="D", role="admin")
cli.cmd_set_active(email="d@x.com", active=False)
assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is False
cli.cmd_set_active(email="d@x.com", active=True)
assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is True
- Step 2: Run tests to verify they fail
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'backend.operator_admin'
- Step 3: Create
backend/operator_admin.py
#!/usr/bin/env python3
"""Operator-account admin CLI — the bootstrap and the break-glass. Run inside the
terra-view container against the live DB. Temp/raw passwords are printed ONCE; only
hashes persist.
# first superadmin (before any UI is reachable) — prompts for a password, or --generate
python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"
# a parent's account — generates a temp password, must-change on first login
python3 backend/operator_admin.py create-user --email dad@x.com --name "Dad" --role admin
python3 backend/operator_admin.py reset-password --email dad@x.com
python3 backend/operator_admin.py list
python3 backend/operator_admin.py disable --email dad@x.com
python3 backend/operator_admin.py enable --email dad@x.com
"""
import os
import sys
import getpass
import argparse
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.database import SessionLocal
from backend.models import OperatorUser
from backend.operator_auth import (
create_operator, reset_operator_password, set_operator_active, _norm_email,
)
def _get(db, email):
u = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if not u:
sys.exit(f"No operator with email '{email}'.")
return u
def cmd_create_superadmin(email, name, password=None, generate=False):
db = SessionLocal()
try:
if password is None and not generate:
password = getpass.getpass("Password for new superadmin: ")
if not password or len(password) < 8:
sys.exit("Password must be at least 8 characters.")
user, raw = create_operator(db, email, name, "superadmin",
password=None if generate else password)
if generate:
print(f"✓ Superadmin {user.email} created. Temp password (shown once): {raw}")
else:
print(f"✓ Superadmin {user.email} created.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_create_user(email, name, role="admin"):
db = SessionLocal()
try:
user, raw = create_operator(db, email, name, role)
print(f"✓ {role} {user.email} created. Temp password (shown once): {raw}")
print(" They'll be required to change it on first login.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_reset_password(email):
db = SessionLocal()
try:
user = _get(db, email)
raw = reset_operator_password(db, user)
print(f"✓ Reset {user.email}. Temp password (shown once): {raw}")
finally:
db.close()
def cmd_set_active(email, active):
db = SessionLocal()
try:
user = _get(db, email)
set_operator_active(db, user, active)
print(f"✓ {user.email} {'enabled' if active else 'disabled'}.")
finally:
db.close()
def cmd_list():
db = SessionLocal()
try:
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
if not users:
print("No operators yet. Run create-superadmin first.")
return
for u in users:
locked = " [LOCKED]" if (u.locked_until and u.locked_until > datetime.utcnow()) else ""
state = "active" if u.active else "DISABLED"
last = u.last_login_at.strftime("%Y-%m-%d %H:%M") if u.last_login_at else "never"
print(f" {u.display_name:<12} {u.email:<28} {u.role:<11} {state}{locked} last: {last}")
finally:
db.close()
def main():
ap = argparse.ArgumentParser(description="Operator-account admin")
sub = ap.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("create-superadmin")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--generate", action="store_true", help="generate a temp password instead of prompting")
p.set_defaults(fn=lambda a: cmd_create_superadmin(a.email, a.name, generate=a.generate))
p = sub.add_parser("create-user")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--role", default="admin", choices=["admin", "superadmin"])
p.set_defaults(fn=lambda a: cmd_create_user(a.email, a.name, a.role))
p = sub.add_parser("reset-password")
p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_reset_password(a.email))
p = sub.add_parser("disable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, False))
p = sub.add_parser("enable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, True))
p = sub.add_parser("list"); p.set_defaults(fn=lambda a: cmd_list())
args = ap.parse_args()
args.fn(args)
if __name__ == "__main__":
main()
- Step 4: Run tests to verify they pass
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_admin_cli.py -v
Expected: PASS (4 passed)
- Step 5: Commit
git add backend/operator_admin.py tests/test_operator_admin_cli.py
git commit -m "feat(auth): operator admin/break-glass CLI"
Task 9: Machine-endpoint regression guard + full-suite green
A dedicated regression test that the gate, when ON, never blocks the watcher heartbeats — and a full-suite run to confirm nothing else broke.
Files:
-
Test:
tests/test_operator_machine_endpoints.py -
Step 1: Write the failing test
# tests/test_operator_machine_endpoints.py
from tests.conftest import wire_operator_auth
def test_machine_endpoints_not_blocked_by_gate(client, db_session, monkeypatch):
"""With the gate ON and no cookie, the LAN-only watcher endpoints must reach
their handlers (the gate must never silently break heartbeats). A handler may
return 422 for an empty body — that still proves the gate let it through."""
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/api/series3/heartbeat", json={}, follow_redirects=False)
assert r.status_code != 401 # gate would 401 an unauth /api/* route
assert r.status_code != 303
r = client.post("/api/series4/heartbeat", json={}, follow_redirects=False)
assert r.status_code not in (401, 303)
r = client.post("/emitters/report", json={}, follow_redirects=False)
assert r.status_code != 303 # gate would 303 an unauth HTML route
def test_static_assets_exempt(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# /sw.js and /manifest.json are PWA assets clients fetch pre-login.
assert client.get("/sw.js", follow_redirects=False).status_code in (200, 404)
assert client.get("/sw.js", follow_redirects=False).status_code != 303
- Step 2: Run the test to verify it passes (the exempt list from Task 5 already covers it)
Run: docker exec terra-view-terra-view-1 python -m pytest tests/test_operator_machine_endpoints.py -v
Expected: PASS (2 passed). If any assertion fails, the exempt list in backend/operator_auth.py (_EXEMPT_EXACT) is missing that path — add it and re-run.
- Step 3: Run the FULL test suite to confirm no regressions
Run: docker exec terra-view-terra-view-1 python -m pytest tests/ -v
Expected: PASS — all operator-auth tests plus the pre-existing portal/auth tests. The gate defaults OFF (env unset), so every pre-existing test is unaffected.
- Step 4: Commit
git add tests/test_operator_machine_endpoints.py
git commit -m "test(auth): regression guard — gate never blocks machine endpoints"
Task 10: Wire the flag into compose + CHANGELOG + rollout doc
Make the flag operable in deployment and document the no-self-lockout rollout.
Files:
-
Modify:
docker-compose.yml(passOPERATOR_AUTH_ENABLEDthrough, default off) -
Modify:
CHANGELOG.md(Unreleased → Added) -
Step 1: Add the flag pass-through to
docker-compose.yml
In the web-app service environment: block, after the COOKIE_SECURE line (line 20), add:
# Operator login gate. Leave false to ship dark; seed a superadmin via
# backend/operator_admin.py, confirm you can log in, THEN set true to enforce.
# Instant escape hatch: set back to false. See docs/superpowers/specs/2026-06-17-operator-auth-design.md
- OPERATOR_AUTH_ENABLED=${OPERATOR_AUTH_ENABLED:-false}
- Step 2: Add the CHANGELOG entry
In CHANGELOG.md, under ## [Unreleased] → ### Added, add this bullet (keep it grouped with the other Added items):
- **Operator authentication (login + roles), shipped dark behind `OPERATOR_AUTH_ENABLED`.** The internal app gains a deny-by-default login gate (one Starlette middleware over every route except an allow-list: `/login` `/logout` `/health` `/static/*` `/portal/*` + the three machine heartbeat endpoints). Two roles — `superadmin` (account management) and `admin` (full app); `operator` reserved. Sessions are a 30-day HMAC-signed `tv_session` cookie re-validated against the DB each request (instant revoke via `active` / `sessions_valid_from`). Password reset is superadmin-driven: reset-anyone (temp shown once + forced change), self-service `/change-password`, and a `backend/operator_admin.py` seed/break-glass CLI. Brute-force lockout (5 tries / 15 min). New `operator_users` table auto-creates — no migration. Reuses the portal's argon2 hasher + a new shared `backend/auth_cookies.py` signer. Rollout: ship with the flag off (app unchanged), seed a superadmin, confirm login, then flip on — the flag is an instant escape hatch. Spec: `docs/superpowers/specs/2026-06-17-operator-auth-design.md`.
- Step 3: Run the full suite once more (sanity — no code changed, but confirm compose/env didn't break import)
Run: docker exec terra-view-terra-view-1 python -m pytest tests/ -q
Expected: PASS (all green)
- Step 4: Commit
git add docker-compose.yml CHANGELOG.md
git commit -m "chore(auth): wire OPERATOR_AUTH_ENABLED into compose + changelog"
Manual verification (after all tasks — done by the human, not a step)
These confirm the rollout sequence on a real container (the spec's "no self-lockout" path):
- Flag off (default) — app unchanged:
docker compose up -dwith noOPERATOR_AUTH_ENABLEDset → the app behaves exactly as today (no login). - Seed:
docker exec terra-view-terra-view-1 python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"(prompts for a password). - Log in while still off: visit
/login, sign in → you get atv_sessioncookie (login works regardless of the flag). - Enforce: set
OPERATOR_AUTH_ENABLED=true,docker compose up -d web-app→ the gate enforces; your cookie lets you in; anything wrong → set it back tofalse(instant escape hatch). - Add parents: from
/admin/users, addadminaccounts (temp passwords shown once; they change on first login).
Self-Review (run by the plan author after writing — recorded here)
Spec coverage: OperatorUser table (T2) ✓ · auth_cookies shared signer (T1) ✓ · tv_session cookie + 30-day + DB re-validation + sessions_valid_from (T3) ✓ · deny-by-default middleware + exempt list incl. 3 machine endpoints + must_change redirect + HTML-303/api-401 split (T5) ✓ · role ladder + require_role superadmin-only (T2/T5/T7) ✓ · authenticate + lockout 5/15min (T4/T6) ✓ · password reset all three paths + forgot=contact-admin (T4/T6/T7) ✓ · seed/break-glass CLI (T8) ✓ · user-mgmt UI (T7) ✓ · login/logout/change-password + templates (T6) ✓ · flag governs middleware AND require_role (T5) ✓ · rollout sequence (T10 + manual) ✓ · no migration / new table auto-creates (T2) ✓ · CHANGELOG (T10) ✓.
Type/name consistency: make_operator_cookie(uid, iat=None), current_operator(request, db), authenticate(db, email, password) -> (user, status), create_operator(db, email, name, role, password=None, must_change=None) -> (user, raw), reset_operator_password(db, user) -> raw, change_own_password(db, user, new_password) -> new_iat, role_at_least(role, minimum), require_role(minimum), COOKIE_NAME="tv_session" — all referenced consistently across T3–T8. The sessions_valid_from-vs-iat truncation race is handled at every write point (_utcnow_seconds default, utcfromtimestamp(new_iat) on change, .replace(microsecond=0) on reset) and covered by test_cookie_minted_with_matching_iat_after_bump_still_valid.
Placeholder scan: none — every code step is complete.