13 Commits

Author SHA1 Message Date
serversdown 68161298a4 fix(auth): hide /admin/users when flag off; pass OPTIONS preflight through gate
- operator_users router now depends on _require_auth_enabled, which raises
  404 when OPERATOR_AUTH_ENABLED is false — prevents world-open pre-seeding
  of a superadmin while the flag is off (the default).  Flag is read as a
  live module attribute (operator_auth.OPERATOR_AUTH_ENABLED) so monkeypatching
  in tests and a runtime flip both take effect.
- operator_gate passes OPTIONS requests through immediately before the exempt-
  path check, so CORS preflight reaches CORSMiddleware rather than being
  303/401'd by the gate.
- Two new tests: test_admin_surface_404s_when_flag_off (test_operator_users)
  and test_options_preflight_passes_through_gate (test_operator_gate).
  Full suite: 90 passed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 20:27:01 +00:00
serversdown 054eebe68b chore(auth): wire OPERATOR_AUTH_ENABLED into compose + changelog 2026-06-17 20:17:01 +00:00
serversdown dc95a59dfa test(auth): regression guard — gate never blocks machine endpoints
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 20:14:30 +00:00
serversdown 7a4453108a feat(auth): operator admin/break-glass CLI
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:50:37 +00:00
serversdown bff9a4af4a feat(auth): superadmin user-management page + CRUD
/admin/users page and /api/admin/users/* JSON CRUD endpoints, all behind
require_role("superadmin"). Temp passwords are returned once on create/reset
and never stored in plaintext. Admins get 403; password_hash is never leaked.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:48:13 +00:00
serversdown 41ab900c33 feat(auth): login/logout/change-password routes + pages
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:38:55 +00:00
serversdown 2879abb355 feat(auth): deny-by-default gate middleware + require_role
Adds operator_gate Starlette HTTP middleware that gates every route
except an explicit allow-list. Flag defaults OFF so all existing
behaviour and tests are unchanged. wire_operator_auth helper in
conftest lets tests monkeypatch the module-global SessionLocal and
flag, keeping the gate's own DB session pointed at the test engine.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:27:41 +00:00
serversdown e8fe4845aa feat(auth): authenticate + lockout + operator data helpers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:19:36 +00:00
serversdown a6e1cb4f87 feat(auth): operator session cookie + current_operator DB re-validation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:11:41 +00:00
serversdown 4abfcbc293 feat(auth): OperatorUser model + role ladder
Add OperatorUser SQLAlchemy model (operator_users table, auto-created by
create_all) with email uniqueness, default active/must_change_password/
failed_login_count, and sessions_valid_from truncated to whole seconds.
Add backend/operator_auth.py with feature flag, cookie constants, _ROLE_RANK
map, role_at_least(), and _norm_email() helpers.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 19:08:18 +00:00
serversdown 8e817ec48d feat(auth): generic HMAC signed-cookie module for operator auth
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 19:05:56 +00:00
serversdown 37e6ca55c1 docs: operator-auth implementation plan (10 TDD tasks) 2026-06-17 18:57:26 +00:00
serversdown 59c19291ca docs: operator-auth design spec (v1 password login + roles + easy reset; 2FA/operator-role deferred)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 17:39:49 +00:00
24 changed files with 3556 additions and 0 deletions
+1
View File
@@ -11,6 +11,7 @@ SLM live monitoring — fan-out feed + cache-first reads. Targets **0.14.0**.
### Added
- **Operator authentication (login + roles), shipped dark behind `OPERATOR_AUTH_ENABLED`.** The internal app gains a deny-by-default login gate (one Starlette middleware over every route except an allow-list: `/login` `/logout` `/health` `/static/*` `/portal/*` + the three machine heartbeat endpoints). Two roles — `superadmin` (account management) and `admin` (full app); `operator` reserved. Sessions are a 30-day HMAC-signed `tv_session` cookie re-validated against the DB each request (instant revoke via `active` / `sessions_valid_from`). Password reset is superadmin-driven: reset-anyone (temp shown once + forced change), self-service `/change-password`, and a `backend/operator_admin.py` seed/break-glass CLI. Brute-force lockout (5 tries / 15 min) + constant-time login (no user-enumeration). New `operator_users` table auto-creates — no migration. Reuses the portal's argon2 hasher + a new shared `backend/auth_cookies.py` signer. Rollout: ship with the flag off (app unchanged), seed a superadmin, confirm login, then flip on — the flag is an instant escape hatch. Spec: `docs/superpowers/specs/2026-06-17-operator-auth-design.md`.
- **Fan-out `/monitor` feed consumption.** The unit live view (`partials/slm_live_view.html`) and the dashboard live tile (`sound_level_meters.html`) now subscribe to SLMM's shared per-device monitor over `WS /api/slmm/{unit}/monitor` instead of each opening its own device stream. Any number of clients attach without each consuming the NL-43's single connection — the "second viewer sees nothing" contention is gone. A WS proxy handler for `/monitor` was added to `backend/routers/slmm.py`.
- **L1/L10 percentile lines + cards.** Both the per-unit live chart and the dashboard card chart now plot L1 (purple) and L10 (orange) alongside Lp/Leq, and the KPI cards show L1/L10. Sourced from the DOD feed's `ln1`/`ln2` (DRD streaming can't carry percentiles, DOD can). Missing/`-.-` values leave a gap rather than dropping the line to 0.
- **Live-chart backfill on open.** Charts seed from SLMM's downsampled DOD trail (`GET /api/slmm/{unit}/history?hours=2`) so a viewer sees recent trend immediately instead of a blank chart that fills one point per second.
+64
View File
@@ -0,0 +1,64 @@
# backend/auth_cookies.py
"""Generic HMAC-signed cookie payloads, shared by operator auth (and, optionally
later, the portal). A signed value is f"{b64url(json)}.{hmac_sha256(b64)}"; read()
verifies the signature in constant time and enforces a server-side iat expiry.
The signing secret is the same SECRET_KEY the portal already reads, so a single
env var protects both cookies. Never store or log raw secrets."""
import os
import hmac
import json
import time
import base64
import hashlib
import logging
logger = logging.getLogger(__name__)
# Same env var the portal cookie uses — one secret protects both. The insecure
# default only exists so dev/test boots without config; set a real SECRET_KEY in prod.
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me")
# Set COOKIE_SECURE=true once served over HTTPS; leave false on plain HTTP or the
# browser won't send the cookie.
COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false").lower() in ("1", "true", "yes")
def _sign(body: str) -> str:
return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest()
def sign(payload: dict) -> str:
"""Serialize + sign a payload dict into a cookie-safe string."""
body = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()
return f"{body}.{_sign(body)}"
def read(raw, max_age: int):
"""Verify a signed value and return its payload dict, or None if missing,
tampered, or older than max_age seconds (by its own `iat`)."""
if not raw or not isinstance(raw, str):
return None
try:
body, sig = raw.rsplit(".", 1)
except (ValueError, AttributeError):
return None
if not hmac.compare_digest(sig, _sign(body)):
return None
try:
data = json.loads(base64.urlsafe_b64decode(body.encode()))
except Exception:
return None
if not isinstance(data, dict):
return None
iat = data.get("iat")
if not isinstance(iat, (int, float)):
return None
now = time.time()
# Reject implausibly future-dated tokens: the same server signs and verifies,
# so there's no real clock skew — a far-future iat (e.g. to dodge max_age or
# outlive a sessions_valid_from bump) is bogus. 60s of slack is generous.
if iat - now > 60:
return None
if (now - iat) > max_age:
return None
return data
+12
View File
@@ -89,6 +89,18 @@ async def add_environment_to_context(request: Request, call_next):
response = await call_next(request)
return response
# Operator auth — deny-by-default gate over the whole internal app. Governed by
# OPERATOR_AUTH_ENABLED (default off → behaves exactly as today). See
# docs/superpowers/specs/2026-06-17-operator-auth-design.md.
from backend.operator_auth import operator_gate
app.middleware("http")(operator_gate)
from backend.routers import operator_auth_routes
app.include_router(operator_auth_routes.router)
from backend.routers import operator_users
app.include_router(operator_users.router)
# Override TemplateResponse to include environment and version in context
original_template_response = templates.TemplateResponse
def custom_template_response(name, context=None, *args, **kwargs):
+31
View File
@@ -3,6 +3,13 @@ from datetime import datetime
from backend.database import Base
def _utcnow_seconds():
"""utcnow truncated to whole seconds — used as the default for
sessions_valid_from so a freshly-issued cookie (whose iat is a whole-second
epoch) never falls a few microseconds before it and self-invalidates."""
return datetime.utcnow().replace(microsecond=0)
class Emitter(Base):
__tablename__ = "emitters"
@@ -772,3 +779,27 @@ class ClientAccessToken(Base):
created_at = Column(DateTime, default=datetime.utcnow)
last_used_at = Column(DateTime, nullable=True)
revoked_at = Column(DateTime, nullable=True) # set = link no longer works
# ============================================================================
# OPERATOR AUTH — internal operator logins (see backend/operator_auth.py)
# ============================================================================
class OperatorUser(Base):
"""An internal operator login. Roles: 'superadmin' (Brian, + account mgmt) and
'admin' (parents, full app). 'operator' is reserved (deferred). Brand-new table
→ create_all builds it, no migration. Never store or log raw passwords."""
__tablename__ = "operator_users"
id = Column(String, primary_key=True, index=True) # UUID
email = Column(String, nullable=False, unique=True, index=True) # login handle, lowercased
display_name = Column(String, nullable=False) # "Brian", "Dad"
password_hash = Column(String, nullable=False) # argon2id
role = Column(String, nullable=False, default="admin") # superadmin | admin
active = Column(Boolean, default=True) # False = login disabled
must_change_password = Column(Boolean, default=False) # forces a change next login
sessions_valid_from = Column(DateTime, default=_utcnow_seconds) # bump = log out everywhere
failed_login_count = Column(Integer, default=0) # lockout counter
locked_until = Column(DateTime, nullable=True) # set after too many bad tries
created_at = Column(DateTime, default=datetime.utcnow)
last_login_at = Column(DateTime, nullable=True)
+137
View File
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Operator-account admin CLI — the bootstrap and the break-glass. Run inside the
terra-view container against the live DB. Temp/raw passwords are printed ONCE; only
hashes persist.
# first superadmin (before any UI is reachable) — prompts for a password, or --generate
python3 backend/operator_admin.py create-superadmin --email you@x.com --name "Brian"
# a parent's account — generates a temp password, must-change on first login
python3 backend/operator_admin.py create-user --email dad@x.com --name "Dad" --role admin
python3 backend/operator_admin.py reset-password --email dad@x.com
python3 backend/operator_admin.py list
python3 backend/operator_admin.py disable --email dad@x.com
python3 backend/operator_admin.py enable --email dad@x.com
"""
import os
import sys
import getpass
import argparse
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.database import SessionLocal
from backend.models import OperatorUser
from backend.operator_auth import (
create_operator, reset_operator_password, set_operator_active, _norm_email,
)
def _get(db, email):
u = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if not u:
sys.exit(f"No operator with email '{email}'.")
return u
def cmd_create_superadmin(email, name, password=None, generate=False):
db = SessionLocal()
try:
if password is None and not generate:
password = getpass.getpass("Password for new superadmin: ")
if not password or len(password) < 8:
sys.exit("Password must be at least 8 characters.")
user, raw = create_operator(db, email, name, "superadmin",
password=None if generate else password)
if generate:
print(f"✓ Superadmin {user.email} created. Temp password (shown once): {raw}")
else:
print(f"✓ Superadmin {user.email} created.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_create_user(email, name, role="admin"):
db = SessionLocal()
try:
user, raw = create_operator(db, email, name, role)
print(f"{role} {user.email} created. Temp password (shown once): {raw}")
print(" They'll be required to change it on first login.")
except ValueError as e:
sys.exit(str(e))
finally:
db.close()
def cmd_reset_password(email):
db = SessionLocal()
try:
user = _get(db, email)
raw = reset_operator_password(db, user)
print(f"✓ Reset {user.email}. Temp password (shown once): {raw}")
finally:
db.close()
def cmd_set_active(email, active):
db = SessionLocal()
try:
user = _get(db, email)
set_operator_active(db, user, active)
print(f"{user.email} {'enabled' if active else 'disabled'}.")
finally:
db.close()
def cmd_list():
db = SessionLocal()
try:
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
if not users:
print("No operators yet. Run create-superadmin first.")
return
for u in users:
locked = " [LOCKED]" if (u.locked_until and u.locked_until > datetime.utcnow()) else ""
state = "active" if u.active else "DISABLED"
last = u.last_login_at.strftime("%Y-%m-%d %H:%M") if u.last_login_at else "never"
print(f" {u.display_name:<12} {u.email:<28} {u.role:<11} {state}{locked} last: {last}")
finally:
db.close()
def main():
ap = argparse.ArgumentParser(description="Operator-account admin")
sub = ap.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("create-superadmin")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--generate", action="store_true", help="generate a temp password instead of prompting")
p.set_defaults(fn=lambda a: cmd_create_superadmin(a.email, a.name, generate=a.generate))
p = sub.add_parser("create-user")
p.add_argument("--email", required=True); p.add_argument("--name", required=True)
p.add_argument("--role", default="admin", choices=["admin", "superadmin"])
p.set_defaults(fn=lambda a: cmd_create_user(a.email, a.name, a.role))
p = sub.add_parser("reset-password")
p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_reset_password(a.email))
p = sub.add_parser("disable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, False))
p = sub.add_parser("enable"); p.add_argument("--email", required=True)
p.set_defaults(fn=lambda a: cmd_set_active(a.email, True))
p = sub.add_parser("list"); p.set_defaults(fn=lambda a: cmd_list())
args = ap.parse_args()
args.fn(args)
if __name__ == "__main__":
main()
+231
View File
@@ -0,0 +1,231 @@
# backend/operator_auth.py
"""Operator authentication: the deny-by-default gate, session cookie, login +
lockout, and the small data helpers shared by the routes and the CLI. Reuses the
argon2 hasher (auth_passwords) and the HMAC signer (auth_cookies).
The flag and SessionLocal are read as module globals at call time so tests can
monkeypatch them."""
import os
import time
import uuid
from datetime import datetime, timedelta
from urllib.parse import quote
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse
from backend.models import OperatorUser
from backend.auth_passwords import hash_password, verify_password, generate_password
from backend.auth_cookies import sign, read, COOKIE_SECURE
from backend.database import SessionLocal
# Feature flag — OFF by default. When off, the gate and require_role both pass
# everything through and the app behaves exactly as it does today.
OPERATOR_AUTH_ENABLED = os.getenv("OPERATOR_AUTH_ENABLED", "false").lower() in ("1", "true", "yes")
COOKIE_NAME = "tv_session"
COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days ("remember this device")
MAX_LOGIN_FAILURES = 5
LOCK_MINUTES = 15
# Role ladder — a rank map so checks read naturally and 'operator' slots in later.
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
# A throwaway hash used only to equalize verify time on the unknown-email path,
# so a missing account can't be distinguished from a wrong password by timing
# (no user-enumeration). The value never authenticates anything.
_DUMMY_PASSWORD_HASH = hash_password("operator-auth-timing-equalizer")
def role_at_least(role: str, minimum: str) -> bool:
"""True iff `role` ranks at or above `minimum`. Unknown roles rank as 0."""
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK[minimum]
def _norm_email(email: str) -> str:
return (email or "").strip().lower()
def make_operator_cookie(uid: str, iat: int = None) -> str:
"""Sign a tv_session value for a user id. iat defaults to now; pass an explicit
iat when you bump sessions_valid_from to that same instant (change-password)."""
return sign({"uid": uid, "iat": int(iat if iat is not None else time.time())})
def current_operator(request, db):
"""Resolve the OperatorUser for a request's tv_session cookie, or None.
Re-validated against the DB every call: a disabled / locked / password-changed
user drops on the next request. Used by the gate middleware (with its own
session) — does not raise."""
data = read(request.cookies.get(COOKIE_NAME), COOKIE_MAX_AGE)
if not data:
return None
uid, iat = data.get("uid"), data.get("iat")
if not uid or not isinstance(iat, (int, float)):
return None
user = db.query(OperatorUser).filter_by(id=uid).first()
if not user or not user.active:
return None
if user.locked_until and user.locked_until > datetime.utcnow():
return None
if user.sessions_valid_from and datetime.utcfromtimestamp(int(iat)) < user.sessions_valid_from:
return None
return user
def register_login_failure(db, user) -> None:
"""Increment a user's failure counter and lock them out past the threshold."""
user.failed_login_count = (user.failed_login_count or 0) + 1
if user.failed_login_count >= MAX_LOGIN_FAILURES:
user.locked_until = datetime.utcnow() + timedelta(minutes=LOCK_MINUTES)
db.commit()
def authenticate(db, email, password):
"""Return (user, "ok") on success, (None, "locked") if locked out, else
(None, "bad"). Never reveals whether the email exists: an unknown email runs
the same argon2 verify (against a dummy hash) as a wrong password, so neither
the response text nor its timing distinguishes the two."""
user = db.query(OperatorUser).filter_by(email=_norm_email(email)).first()
if user and user.locked_until and user.locked_until > datetime.utcnow():
return None, "locked"
password_ok = verify_password(password, user.password_hash if user else _DUMMY_PASSWORD_HASH)
if not user or not user.active or not password_ok:
if user:
register_login_failure(db, user)
return None, "bad"
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.utcnow()
db.commit()
return user, "ok"
def create_operator(db, email, name, role, password=None, must_change=None):
"""Create an operator. With no password, generate a temp one and force a change
(must_change defaults True). With a password, must_change defaults False.
Returns (user, raw_password_to_show_once). Raises ValueError on dup/bad role."""
email = _norm_email(email)
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
if db.query(OperatorUser).filter_by(email=email).first():
raise ValueError(f"operator {email} already exists")
if password is None:
password = generate_password()
if must_change is None:
must_change = True
elif must_change is None:
must_change = False
user = OperatorUser(id=str(uuid.uuid4()), email=email, display_name=name,
password_hash=hash_password(password), role=role,
active=True, must_change_password=must_change)
db.add(user)
db.commit()
return user, password
def reset_operator_password(db, user) -> str:
"""Generate a fresh temp password, force a change, log the user out everywhere.
Returns the raw password to show once."""
raw = generate_password()
user.password_hash = hash_password(raw)
user.must_change_password = True
user.failed_login_count = 0
user.locked_until = None
user.sessions_valid_from = datetime.utcnow().replace(microsecond=0)
db.commit()
return raw
def change_own_password(db, user, new_password) -> int:
"""Set a user's own new password, clear the forced-change flag, and bump
sessions_valid_from to the returned iat — the caller mints the replacement
cookie with that exact iat so it stays valid while older cookies die."""
new_iat = int(time.time())
user.password_hash = hash_password(new_password)
user.must_change_password = False
user.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db.commit()
return new_iat
def set_operator_active(db, user, active: bool):
user.active = bool(active)
db.commit()
return user
def set_operator_role(db, user, role: str):
if role not in _ROLE_RANK:
raise ValueError(f"unknown role {role!r}")
user.role = role
db.commit()
return user
# Routes reachable with no login. A new route added next year is gated by default.
_EXEMPT_EXACT = {
"/login", "/logout", "/health",
"/manifest.json", "/sw.js", "/favicon.ico", "/offline-db.js",
"/portal", # portal home (its own auth)
# machine endpoints — LAN-only, automated, no human (watchers/heartbeats):
"/emitters/report", "/api/series3/heartbeat", "/api/series4/heartbeat",
}
_EXEMPT_PREFIX = ("/static/", "/portal/")
def _is_exempt(path: str) -> bool:
return path in _EXEMPT_EXACT or path.startswith(_EXEMPT_PREFIX)
async def operator_gate(request: Request, call_next):
"""Deny-by-default gate. Flag off → pass through (app as today). Flag on →
exempt paths pass; otherwise require a valid operator session, stash it on
request.state.operator, and force a password change when pending."""
if not OPERATOR_AUTH_ENABLED:
return await call_next(request)
# CORS preflight carries no auth and must reach CORSMiddleware, not the gate.
if request.method == "OPTIONS":
return await call_next(request)
path = request.url.path
if _is_exempt(path):
return await call_next(request)
db = SessionLocal()
try:
user = current_operator(request, db)
if user is not None:
db.expunge(user) # detach a fully-loaded row so we can close now
finally:
db.close()
if user is None:
if path.startswith("/api/"):
return JSONResponse({"detail": "Not authenticated"}, status_code=401)
return RedirectResponse(f"/login?next={quote(path)}", status_code=303)
if user.must_change_password and path not in ("/change-password", "/logout"):
if path.startswith("/api/"):
return JSONResponse({"detail": "Password change required"}, status_code=403)
return RedirectResponse("/change-password", status_code=303)
request.state.operator = user
return await call_next(request)
def require_role(minimum: str):
"""Dependency factory: require a logged-in operator ranked >= `minimum`.
Respects the flag (off → pass through). When on, the middleware has already
set request.state.operator before this runs."""
def _dep(request: Request):
if not OPERATOR_AUTH_ENABLED:
return None
user = getattr(request.state, "operator", None)
if user is None:
raise HTTPException(status_code=401, detail="Not authenticated")
if not role_at_least(user.role, minimum):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return _dep
+111
View File
@@ -0,0 +1,111 @@
"""Operator login / logout / change-password. These routes intentionally work
regardless of OPERATOR_AUTH_ENABLED (you log in while the flag is still off during
rollout). /login and /logout are on the gate's exempt list; /change-password
requires a session (the gate sets request.state.operator)."""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import OperatorUser
from backend.templates_config import templates
from backend.operator_auth import (
authenticate, current_operator, change_own_password, make_operator_cookie,
COOKIE_NAME, COOKIE_MAX_AGE,
)
from backend.auth_cookies import COOKIE_SECURE
from backend.auth_passwords import verify_password
router = APIRouter(tags=["operator-auth"])
def _safe_next(next_url: str) -> str:
"""Only allow same-site relative redirects (an open-redirect guard). Rejects
`//host` and `/\\host` — browsers treat a backslash as `/` in the authority
position, so both escape to an external site."""
if next_url and next_url.startswith("/") and not next_url.startswith(("//", "/\\")):
return next_url
return "/"
@router.get("/login")
async def login_page(request: Request, next: str = "", error: str = ""):
return templates.TemplateResponse("login.html",
{"request": request, "next": next, "error": error})
@router.post("/login")
async def login_submit(request: Request, next: str = "",
email: str = Form(...), password: str = Form(...),
db: Session = Depends(get_db)):
user, status = authenticate(db, email, password)
if status == "locked":
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next,
"error": "Too many attempts — try again in 15 minutes."},
status_code=200)
if user is None:
return templates.TemplateResponse(
"login.html",
{"request": request, "next": next, "error": "Invalid email or password."},
status_code=200)
dest = "/change-password" if user.must_change_password else _safe_next(next)
resp = RedirectResponse(url=dest, status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
@router.get("/logout")
async def logout(request: Request):
resp = RedirectResponse(url="/login", status_code=303)
resp.delete_cookie(COOKIE_NAME)
return resp
@router.get("/change-password")
async def change_password_page(request: Request, db: Session = Depends(get_db)):
user = getattr(request.state, "operator", None) or current_operator(request, db)
if user is None:
return RedirectResponse(url="/login", status_code=303)
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": ""})
@router.post("/change-password")
async def change_password_submit(request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
db: Session = Depends(get_db)):
_user_ref = getattr(request.state, "operator", None) or current_operator(request, db)
if _user_ref is None:
return RedirectResponse(url="/login", status_code=303)
# Re-fetch a session-bound copy so mutations via `db` will be committed.
# request.state.operator may be expunged (detached) from the gate's own
# SessionLocal; operating on a detached object against a different session
# would silently drop the UPDATE.
user = db.query(OperatorUser).filter_by(id=_user_ref.id).first()
if user is None:
return RedirectResponse(url="/login", status_code=303)
def _err(msg):
return templates.TemplateResponse(
"change_password.html",
{"request": request, "must_change": user.must_change_password, "error": msg},
status_code=200)
if not verify_password(current_password, user.password_hash):
return _err("Current password is incorrect.")
if len(new_password) < 8:
return _err("New password must be at least 8 characters.")
if new_password != confirm_password:
return _err("New passwords do not match.")
new_iat = change_own_password(db, user, new_password)
resp = RedirectResponse(url="/", status_code=303)
resp.set_cookie(COOKIE_NAME, make_operator_cookie(user.id, iat=new_iat),
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=COOKIE_SECURE)
return resp
+115
View File
@@ -0,0 +1,115 @@
"""Operator account management — superadmin only. Temp passwords are returned in
the JSON response once (shown to the superadmin to hand off); only hashes persist."""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.templates_config import templates
from backend.models import OperatorUser
from backend.operator_auth import (
require_role, create_operator, reset_operator_password,
set_operator_active, set_operator_role,
)
import backend.operator_auth as operator_auth
from backend.utils.timezone import format_local_datetime
def _require_auth_enabled():
"""The operator-management surface does not exist while operator auth is
disabled — otherwise these net-new endpoints would be world-open with the
flag off (the default), letting anyone pre-seed a superadmin. Read the flag
as a live module attribute so the test monkeypatch and a runtime flip both
take effect."""
if not operator_auth.OPERATOR_AUTH_ENABLED:
raise HTTPException(status_code=404, detail="Not found")
router = APIRouter(tags=["operator-users"], dependencies=[Depends(_require_auth_enabled)])
_superadmin = require_role("superadmin")
class NewUser(BaseModel):
email: str
name: str
role: str = "admin"
class RoleChange(BaseModel):
role: str
def _serialize(u: OperatorUser) -> dict:
from datetime import datetime
return {
"id": u.id, "email": u.email, "display_name": u.display_name, "role": u.role,
"active": bool(u.active), "must_change_password": bool(u.must_change_password),
"locked": bool(u.locked_until and u.locked_until > datetime.utcnow()),
"last_login_at": format_local_datetime(u.last_login_at, "%Y-%m-%d %H:%M") if u.last_login_at else None,
}
@router.get("/admin/users")
async def users_page(request: Request, _=Depends(_superadmin)):
return templates.TemplateResponse("admin/users.html", {"request": request})
@router.get("/api/admin/users")
async def list_users(_=Depends(_superadmin), db: Session = Depends(get_db)):
users = db.query(OperatorUser).order_by(OperatorUser.display_name).all()
return {"users": [_serialize(u) for u in users]}
@router.post("/api/admin/users")
async def add_user(body: NewUser, _=Depends(_superadmin), db: Session = Depends(get_db)):
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
try:
user, raw = create_operator(db, body.email, body.name, body.role)
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
return {"user": _serialize(user), "password": raw}
@router.post("/api/admin/users/{user_id}/reset-password")
async def reset_user_password(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
raw = reset_operator_password(db, user)
return {"password": raw}
@router.post("/api/admin/users/{user_id}/disable")
async def disable_user(user_id: str, acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot disable your own account"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, False)
return {"active": False}
@router.post("/api/admin/users/{user_id}/enable")
async def enable_user(user_id: str, _=Depends(_superadmin), db: Session = Depends(get_db)):
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_active(db, user, True)
return {"active": True}
@router.post("/api/admin/users/{user_id}/role")
async def change_user_role(user_id: str, body: RoleChange,
acting=Depends(_superadmin), db: Session = Depends(get_db)):
if acting and acting.id == user_id:
return JSONResponse(status_code=400, content={"detail": "Cannot change your own role"})
if body.role not in ("admin", "superadmin"):
return JSONResponse(status_code=400, content={"detail": "role must be admin or superadmin"})
user = db.query(OperatorUser).filter_by(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
set_operator_role(db, user, body.role)
return {"role": user.role}
+4
View File
@@ -18,6 +18,10 @@ services:
# browser won't send the cookie and the portal breaks).
- SECRET_KEY=${SECRET_KEY:-dev-insecure-change-me}
- COOKIE_SECURE=${COOKIE_SECURE:-false}
# Operator login gate. Leave false to ship dark; seed a superadmin via
# backend/operator_admin.py, confirm you can log in, THEN set true to enforce.
# Instant escape hatch: set back to false. See docs/superpowers/specs/2026-06-17-operator-auth-design.md
- OPERATOR_AUTH_ENABLED=${OPERATOR_AUTH_ENABLED:-false}
# Display timezone for server logs + any text-rendered timestamps.
# DB columns are stored UTC regardless; this only affects what
# operators see. Override here for non-US-East deployments.
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,266 @@
# Operator Authentication — Design & Build Plan
**Status:** in development (`feat/operator-auth`) · **Targets:** 0.15.x · **Date:** 2026-06-17
Adds a login + roles to the **internal** Terra-View app — the operator-facing
surface that today has **zero auth**. This is the prerequisite that makes the app
safe to expose to the internet (the office-deployment sequencing: operator auth →
expose). Expands the "Deferred A" section of
[2026-06-15-portal-auth-design.md](2026-06-15-portal-auth-design.md) into a
standalone spec.
## Goal
Anyone reaching the internal app must log in. Three known users to start (you +
two parents), two effective roles, and a **dead-simple password-reset story** for a
family-run shop. Reuses the building blocks the client portal already shipped: the
argon2 hasher (`backend/auth_passwords.py`) and the HMAC signed-cookie pattern
(`backend/portal_auth.py`).
## Scope
**v1 (this spec):** email + password login (argon2) · long-lived "remember this
device" session · brute-force lockout · a **deny-by-default gate** over the whole
internal app · `superadmin`/`admin` roles · **superadmin-only user management** ·
**password reset** (superadmin-resets-anyone + self-service change + forced change)
· a **seed CLI** to bootstrap · the `OPERATOR_AUTH_ENABLED` **feature-flag rollout**.
**Deferred (designed-not-built):** TOTP 2FA (near-term follow-up, `superadmin`
account first) · the `operator` restricted role · email-based self-service
password reset (needs the email infra coming with the report work).
## Principles
1. **Deny by default.** Every route requires a login *except* an explicit allow-list.
A route added next year is protected automatically — you can't forget to gate it.
2. **Can't lock yourself out.** Ship dark behind a feature flag; seed + verify before
enforcing; the flag is an instant escape hatch; a CLI is the break-glass.
3. **Reuse, don't reinvent.** argon2 + the signed-cookie HMAC already exist and are
tested. Operator auth is a thin new layer, not a parallel crypto stack.
4. **Easy recovery.** For a 3-person shop, "forgot my password" must be a 10-second
fix — the superadmin resets it, no email round-trip required.
## Architecture
```
OPERATOR_AUTH_ENABLED=false ──▶ pass everything (app as today)
request ──▶ gate middleware ─┤
└ enabled ─▶ path exempt? ──yes──▶ serve (no login)
│ exempt: /login /logout /health
│ /static/* /portal/* + 3 machine endpoints
└no─▶ valid operator session?
├ no ─▶ HTML: 303 → /login?next=…
│ /api/*: 401 JSON
├ must_change_password ─▶ 303 → /change-password
└ yes ─▶ request.state.operator = user
─▶ route runs; require_role() may 403
```
One **Starlette HTTP middleware** is the gate (not per-route dependencies — a
middleware can't miss a route). It resolves the operator from the cookie using its
own `SessionLocal()` (same pattern the portal WS handler uses), stashes the user on
`request.state.operator`, and a `require_role(...)` dependency reads it for the few
routes that need more than "logged in."
## Data model
New table **`operator_users`** (brand-new → `create_all` builds it on startup, **no
migration needed**, same as the portal's `clients` table):
| Column | Type | Notes |
|---|---|---|
| `id` | str UUID | caller-supplied `str(uuid.uuid4())` (codebase convention) |
| `email` | str, unique, indexed | login handle, stored lowercased |
| `display_name` | str | "Brian", "Dad" — shown in UI + history |
| `password_hash` | str | argon2id via `auth_passwords.hash_password` |
| `role` | str | `"superadmin"` \| `"admin"` (`"operator"` reserved, deferred) |
| `active` | bool, default True | disable a login without deleting |
| `must_change_password` | bool, default False | set on create/reset → forces a change on next login |
| `sessions_valid_from` | datetime, default `utcnow` | bump to invalidate ALL of a user's sessions |
| `failed_login_count` | int, default 0 | lockout counter |
| `locked_until` | datetime, nullable | set after too many bad tries |
| `created_at` | datetime, default `utcnow` | |
| `last_login_at` | datetime, nullable | |
(Deferred columns, not in v1: `totp_secret`, `totp_enabled`.)
**Role ladder** — a rank map so checks read naturally and `operator` slots in later:
```python
_ROLE_RANK = {"operator": 10, "admin": 20, "superadmin": 30}
```
`require_role("admin")` = admin or above; `require_role("superadmin")` for account mgmt.
## Sessions
**New shared module `backend/auth_cookies.py`** — lift the generic signer out so both
auth systems share one implementation:
```python
def sign(payload: dict) -> str # f"{b64url(json)}.{hmac_sha256(b64, SECRET_KEY)}"
def read(raw: str, max_age: int) -> dict | None # verify sig (compare_digest) + iat expiry; None on tamper/expiry
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me") # same env the portal reads
COOKIE_SECURE = os.getenv("COOKIE_SECURE", "false") in truthy
```
Operator auth uses it now. (Portal's existing cookie helpers keep working untouched;
migrating them onto `auth_cookies` is an optional later dedupe, gated on the portal
tests staying green — don't destabilize the shipped portal for it.)
**Operator session cookie:** name **`tv_session`** (distinct from the portal's
`portal_session`), payload `{"uid": <id>, "iat": <epoch>}`, `max_age` 30 days
(= the "remember this device" — a small trusted set re-logs in rarely), `httponly`,
`samesite=lax`, `secure=COOKIE_SECURE`.
**Validation each request** (`current_operator(request, db)`): read+verify cookie →
load `OperatorUser` by `uid` → require `active`, `iat >= sessions_valid_from`
(epoch), and not `locked_until > now`. Any failure → no session. Bumping
`sessions_valid_from` (on password change / "log out everywhere") instantly kills all
live cookies with no session table.
## Authorization
**The gate (middleware) exempt list:**
- `/login`, `/logout`, `/health`, `/static/*`, plus PWA assets
(`/manifest.json`, `/sw.js`, `/favicon.ico`)
- `/portal/*` — the client portal keeps its own (separate) auth
- **machine endpoints (LAN-only, automated, no human):** `/emitters/report`,
`/api/series3/heartbeat`, `/api/series4/heartbeat`
`/change-password` is **not** exempt — it requires a logged-in session (you change
*your own* password). It's only *excluded from the `must_change_password` redirect*,
so a forced-change user can actually reach it (no redirect loop).
**Permission split — minimal by design.** Because the `operator` role is deferred,
every real v1 user is `admin` or `superadmin`, so "logged in" already means "full
app." The *only* thing gated above plain-admin is **account management**
`require_role("superadmin")` on the user-management routes. Everything else just
requires a valid session (the middleware). One extra guard, not a sprawling matrix.
**The flag governs everything.** Both the middleware *and* `require_role` respect
`OPERATOR_AUTH_ENABLED`: when it's off, neither enforces anything (no session is set,
and `require_role` passes through) — the app behaves exactly as it does today. When
it's on, the middleware guarantees `request.state.operator` is set before any
`require_role` check runs.
## Password management & reset *(the emphasized requirement)*
Three paths, no email infra required:
1. **Superadmin resets anyone** — from the user-management UI, "Reset password" →
generates a strong password (`auth_passwords.generate_password`), stores its hash,
sets `must_change_password=True`, **shows the temp password once** for you to hand
off. Covers "easy for *me* to reset *their* password."
2. **Self-service change**`/change-password` (any logged-in user): current + new.
Used for routine changes **and** the forced post-reset change. On success, bump
`sessions_valid_from` (logs out other devices) and clear `must_change_password`.
3. **Forced change** — after a reset/first login, `must_change_password=True` → the
gate routes them to `/change-password` until they set their own.
**Forgot it entirely (can't log in):** v1 has **no email reset**`/login` shows
"Forgot your password? Contact your administrator," and you (superadmin) reset it via
the UI or CLI. For a 3-person shop that's a text message, not a feature. (Email-based
self-service is the deferred follow-up once email infra lands.)
## Bootstrapping — seed CLI
`backend/operator_admin.py` (modeled on the existing `portal_admin.py`), run inside
the container against the live DB:
```
create-superadmin --email you@x.com --name "Brian" # prompts for a password (or --generate)
create-user --email dad@x.com --name "Dad" --role admin # generates a temp password, must_change=True
reset-password --email dad@x.com # generates a temp, must_change=True
list # users + roles + active/locked state
disable --email dad@x.com / enable --email dad@x.com
```
The CLI is the bootstrap (first superadmin, before any UI is reachable) **and** the
break-glass (locked out / forgot everything).
## Account-management UI (superadmin-only)
`GET /admin/users` (page, `require_role("superadmin")`) + JSON endpoints:
- list operators (name, email, role, active, locked, last login)
- add operator (email, name, role) → temp password shown once
- reset password → temp shown once
- enable / disable, change role
Template `templates/admin/users.html`. Admins (parents) don't see this; superadmin only.
## Login / logout / change-password
- `GET /login``templates/login.html` (email + password, optional `?next=`).
- `POST /login` → lowercase email, lockout check, argon2 verify; on success set
`tv_session`, stamp `last_login_at`, clear `failed_login_count`, redirect to `next`
or `/`; on `must_change_password``/change-password`; on fail → increment +
generic "invalid email or password" (no user-enumeration), lock after 5 → 15 min.
- `GET /logout` → clear cookie → `/login`.
- `GET/POST /change-password``templates/change_password.html`.
## Error handling
- Wrong email/password → generic message, increment fail count.
- ≥5 fails → "too many attempts, try again in 15 minutes" (`locked_until`).
- No/expired/forged cookie → HTML routes 303→`/login?next=…`; `/api/*` → 401 JSON.
- Disabled / role-changed / password-changed-elsewhere → bounced on next request
(re-validated against the DB every request).
- Superadmin-only route hit by an admin → 403.
## Rollout — the no-self-lockout sequence
1. Ship with `OPERATOR_AUTH_ENABLED=false` (default) → the middleware short-circuits,
app behaves **exactly as today**. Deploying can't break or lock anything.
2. Seed your `superadmin` via `operator_admin.py`.
3. Hit `/login` and confirm you get a session **while the flag is still off** (the
login routes work regardless of the flag).
4. Flip `OPERATOR_AUTH_ENABLED=true` → the gate enforces. Your cookie is valid → you're
in. Anything wrong → flip it back off (instant escape hatch).
5. Create your parents' accounts from `/admin/users` (temp passwords, they change on
first login).
- **Break-glass:** `operator_admin.py reset-password` / `create-superadmin` in the
container; or flag off.
## Testing
Reuses the pytest harness from the portal work (`docker exec … python -m pytest`).
- **Middleware:** flag off → every path passes; flag on → exempt paths + the 3 machine
endpoints pass with no cookie, a gated HTML path 303s to `/login`, a gated `/api/*`
path 401s, `must_change_password` user is routed to `/change-password`.
- **Login:** success sets `tv_session`; wrong password rejected + counts; 5 wrong →
locked (even correct password refused).
- **Roles:** `require_role("superadmin")` route → admin gets 403, superadmin 200.
- **Sessions:** bumping `sessions_valid_from` invalidates an existing cookie.
- **Password:** self-change works + clears `must_change_password`; superadmin reset
sets a new hash + `must_change_password` + returns the raw once.
- **Machine endpoints:** `/api/series3/heartbeat` etc. still 200 with the gate ON and
no cookie (regression guard so we never silently break the watchers).
## File structure
| File | Responsibility |
|---|---|
| `backend/auth_cookies.py` *(new)* | generic `sign`/`read` + `SECRET_KEY`/`COOKIE_SECURE` |
| `backend/models.py` | add `OperatorUser` |
| `backend/operator_auth.py` *(new)* | `current_operator`, `require_role`, the gate middleware, login/lockout helpers |
| `backend/routers/operator_auth_routes.py` *(new)* | `/login`, `/logout`, `/change-password` |
| `backend/routers/operator_users.py` *(new)* | `/admin/users` page + CRUD (superadmin) |
| `backend/operator_admin.py` *(new)* | seed/break-glass CLI |
| `backend/main.py` | register the gate middleware + routers; `OPERATOR_AUTH_ENABLED` |
| `templates/login.html`, `templates/change_password.html`, `templates/admin/users.html` *(new)* | UI |
## Going to prod
- New table auto-creates; **no migration**. Just code + seeding.
- Set a real `SECRET_KEY` (shared with the portal cookie) and `COOKIE_SECURE=true`
once on HTTPS — same env knobs already wired in `docker-compose.yml`.
- Operator auth is what makes internet-exposing the internal app safe; pair with the
(deferred) office deployment + reverse-proxy/TLS work.
## Security notes
- Deny-by-default; client-supplied ids never trusted; every request re-validates the
session against the DB (instant revoke via `active` / `sessions_valid_from`).
- Passwords argon2-hashed; generic login errors (no user-enumeration); lockout on
brute force; raw temp passwords shown once, never stored or logged.
- Cookies `HttpOnly` + `SameSite=Lax` + `Secure` (on TLS), HMAC-signed with server-side
`iat` expiry.
- **Known residual until deploy:** without TLS the password crosses the wire in
cleartext — fix is the deployment-phase TLS (Synology Let's Encrypt / Cloudflare
Tunnel). The login is still a massive improvement over today's zero-auth exposure.
- TOTP 2FA is the near-term follow-up (superadmin first), especially without the UniFi
edge in front on the home network.
+71
View File
@@ -0,0 +1,71 @@
{% extends "base.html" %}
{% block title %}Operator Accounts{% endblock %}
{% block content %}
<div class="max-w-4xl mx-auto p-4">
<div class="flex items-center justify-between mb-4">
<h1 class="text-2xl font-semibold">Operator Accounts</h1>
<button id="add-user-btn" class="px-3 py-2 rounded bg-orange-500 hover:bg-orange-600 text-white text-sm">+ Add operator</button>
</div>
<div id="temp-pw-banner" class="hidden mb-4 px-3 py-2 rounded bg-emerald-900/60 text-emerald-100 text-sm"></div>
<table class="w-full text-sm">
<thead><tr class="text-left border-b border-slate-600">
<th class="py-2">Name</th><th>Email</th><th>Role</th><th>Status</th><th>Last login</th><th></th>
</tr></thead>
<tbody id="user-rows"></tbody>
</table>
</div>
<script>
const $ = (s) => document.querySelector(s);
function showTemp(email, pw) {
const b = $("#temp-pw-banner");
b.textContent = `Temporary password for ${email}: ${pw} — copy it now, it won't be shown again.`;
b.classList.remove("hidden");
}
async function load() {
const r = await fetch("/api/admin/users");
const { users } = await r.json();
$("#user-rows").innerHTML = users.map(u => `
<tr class="border-b border-slate-700">
<td class="py-2">${u.display_name}</td>
<td>${u.email}</td>
<td>
<select data-role="${u.id}" class="bg-slate-700 rounded px-1 py-0.5">
<option value="admin"${u.role==='admin'?' selected':''}>admin</option>
<option value="superadmin"${u.role==='superadmin'?' selected':''}>superadmin</option>
</select>
</td>
<td>${u.active ? 'active' : 'disabled'}${u.locked ? ' (locked)' : ''}</td>
<td>${u.last_login_at || '—'}</td>
<td class="text-right space-x-2">
<button data-reset="${u.id}" data-email="${u.email}" class="text-orange-400 hover:underline">Reset pw</button>
<button data-toggle="${u.id}" data-active="${u.active}" class="text-slate-300 hover:underline">${u.active ? 'Disable' : 'Enable'}</button>
</td>
</tr>`).join("");
}
document.addEventListener("click", async (e) => {
if (e.target.dataset.reset) {
const r = await fetch(`/api/admin/users/${e.target.dataset.reset}/reset-password`, {method:"POST"});
const d = await r.json(); showTemp(e.target.dataset.email, d.password); load();
} else if (e.target.dataset.toggle) {
const action = e.target.dataset.active === "true" ? "disable" : "enable";
await fetch(`/api/admin/users/${e.target.dataset.toggle}/${action}`, {method:"POST"}); load();
} else if (e.target.id === "add-user-btn") {
const email = prompt("Email?"); if (!email) return;
const name = prompt("Display name?") || email;
const role = prompt("Role (admin / superadmin)?", "admin") || "admin";
const r = await fetch("/api/admin/users", {method:"POST", headers:{"Content-Type":"application/json"},
body: JSON.stringify({email, name, role})});
if (r.ok) { const d = await r.json(); showTemp(email, d.password); load(); }
else { alert((await r.json()).detail || "Failed"); }
}
});
document.addEventListener("change", async (e) => {
if (e.target.dataset.role) {
await fetch(`/api/admin/users/${e.target.dataset.role}/role`, {method:"POST",
headers:{"Content-Type":"application/json"}, body: JSON.stringify({role: e.target.value})});
load();
}
});
load();
</script>
{% endblock %}
+39
View File
@@ -0,0 +1,39 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Change password · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-2 text-center">Change your password</h1>
{% if must_change %}
<p class="mb-4 text-sm text-amber-300 text-center">Please set a new password to continue.</p>
{% endif %}
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/change-password" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="current_password">Current password</label>
<input id="current_password" name="current_password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="new_password">New password</label>
<input id="new_password" name="new_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="confirm_password">Confirm new password</label>
<input id="confirm_password" name="confirm_password" type="password" minlength="8" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Update password</button>
</form>
</div>
</body>
</html>
+32
View File
@@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sign in · Terra-View</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-slate-900 text-slate-100 min-h-screen flex items-center justify-center">
<div class="w-full max-w-sm p-8 bg-slate-800 rounded-xl shadow-lg">
<h1 class="text-xl font-semibold mb-6 text-center">Terra-View</h1>
{% if error %}
<div class="mb-4 px-3 py-2 rounded bg-red-900/60 text-red-200 text-sm">{{ error }}</div>
{% endif %}
<form method="post" action="/login{% if next %}?next={{ next }}{% endif %}" class="space-y-4">
<div>
<label class="block text-sm mb-1" for="email">Email</label>
<input id="email" name="email" type="email" autofocus required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<div>
<label class="block text-sm mb-1" for="password">Password</label>
<input id="password" name="password" type="password" required
class="w-full px-3 py-2 rounded bg-slate-700 border border-slate-600 focus:outline-none focus:ring-2 focus:ring-orange-500">
</div>
<button type="submit"
class="w-full py-2 rounded bg-orange-500 hover:bg-orange-600 font-medium">Sign in</button>
</form>
<p class="mt-4 text-xs text-slate-400 text-center">Forgot your password? Contact your administrator.</p>
</div>
</body>
</html>
+12
View File
@@ -62,3 +62,15 @@ def make_project(db_session, name=None, **kwargs):
db_session.add(p)
db_session.commit()
return p
def wire_operator_auth(monkeypatch, db_session, enabled=True):
"""Point the gate middleware's SessionLocal at the test engine and flip the
flag. The middleware opens its OWN session (it can't use the get_db override),
so it must read the same engine the test writes to."""
import backend.operator_auth as oa
from sqlalchemy.orm import sessionmaker
maker = sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False)
monkeypatch.setattr(oa, "SessionLocal", maker, raising=False)
monkeypatch.setattr(oa, "OPERATOR_AUTH_ENABLED", enabled, raising=False)
return oa
+44
View File
@@ -0,0 +1,44 @@
# tests/test_operator_admin_cli.py
from sqlalchemy.orm import sessionmaker
from backend.models import OperatorUser
from backend.auth_passwords import verify_password
import backend.operator_admin as cli
def _maker(db_session):
return sessionmaker(bind=db_session.get_bind(), autocommit=False, autoflush=False)
def test_seed_superadmin(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_superadmin(email="brian@x.com", name="Brian", password="chosen-pw-1")
u = db_session.query(OperatorUser).filter_by(email="brian@x.com").first()
assert u.role == "superadmin"
assert u.must_change_password is False
assert verify_password("chosen-pw-1", u.password_hash)
def test_create_user_generates_temp(db_session, monkeypatch, capsys):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="dad@x.com", name="Dad", role="admin")
u = db_session.query(OperatorUser).filter_by(email="dad@x.com").first()
assert u.role == "admin" and u.must_change_password is True
assert "dad@x.com" in capsys.readouterr().out # prints the temp once
def test_reset_password_cli(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="r@x.com", name="R", role="admin")
before = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash
cli.cmd_reset_password(email="r@x.com")
after = db_session.query(OperatorUser).filter_by(email="r@x.com").first().password_hash
assert before != after
def test_disable_enable_cli(db_session, monkeypatch):
monkeypatch.setattr(cli, "SessionLocal", _maker(db_session), raising=False)
cli.cmd_create_user(email="d@x.com", name="D", role="admin")
cli.cmd_set_active(email="d@x.com", active=False)
assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is False
cli.cmd_set_active(email="d@x.com", active=True)
assert db_session.query(OperatorUser).filter_by(email="d@x.com").first().active is True
+89
View File
@@ -0,0 +1,89 @@
# tests/test_operator_authenticate.py
import time
from datetime import datetime
import pytest
from backend.operator_auth import (
authenticate, create_operator, reset_operator_password,
set_operator_active, set_operator_role, change_own_password, MAX_LOGIN_FAILURES,
)
from backend.auth_passwords import verify_password
from backend.models import OperatorUser
def test_create_operator_generates_temp_and_forces_change(db_session):
user, raw = create_operator(db_session, "Dad@X.com", "Dad", "admin")
assert user.email == "dad@x.com" # lowercased
assert user.must_change_password is True
assert verify_password(raw, user.password_hash)
def test_create_operator_with_explicit_password_no_forced_change(db_session):
user, raw = create_operator(db_session, "brian@x.com", "Brian", "superadmin", password="chosen-pw-123")
assert raw == "chosen-pw-123"
assert user.must_change_password is False
def test_create_operator_rejects_duplicate_and_bad_role(db_session):
create_operator(db_session, "a@x.com", "A", "admin")
with pytest.raises(ValueError):
create_operator(db_session, "A@x.com", "A2", "admin") # dup (case-insensitive)
with pytest.raises(ValueError):
create_operator(db_session, "b@x.com", "B", "wizard") # bad role
def test_authenticate_success(db_session):
user, raw = create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
got, status = authenticate(db_session, "OK@x.com", "rightpw-9")
assert status == "ok" and got.id == user.id
assert got.last_login_at is not None
assert got.failed_login_count == 0
def test_authenticate_wrong_password_counts(db_session):
create_operator(db_session, "wp@x.com", "Wp", "admin", password="rightpw-9")
got, status = authenticate(db_session, "wp@x.com", "nope")
assert got is None and status == "bad"
assert db_session.query(OperatorUser).filter_by(email="wp@x.com").first().failed_login_count == 1
def test_lockout_after_five_then_correct_password_refused(db_session):
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
for _ in range(MAX_LOGIN_FAILURES):
authenticate(db_session, "lk@x.com", "nope")
got, status = authenticate(db_session, "lk@x.com", "rightpw-9") # correct, but locked
assert got is None and status == "locked"
def test_authenticate_unknown_email_is_bad_not_error(db_session):
got, status = authenticate(db_session, "ghost@x.com", "whatever")
assert got is None and status == "bad"
def test_reset_password_sets_new_hash_forces_change_and_bumps_sessions(db_session):
user, _ = create_operator(db_session, "r@x.com", "R", "admin", password="orig-pw-1")
before = user.sessions_valid_from
raw = reset_operator_password(db_session, user)
assert verify_password(raw, user.password_hash)
assert user.must_change_password is True
assert user.sessions_valid_from >= before
def test_change_own_password_clears_flag_and_bumps(db_session):
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
user.must_change_password = True
db_session.commit()
new_iat = change_own_password(db_session, user, "brand-new-pw-2")
assert verify_password("brand-new-pw-2", user.password_hash)
assert user.must_change_password is False
assert user.sessions_valid_from == datetime.utcfromtimestamp(new_iat)
def test_set_active_and_role(db_session):
user, _ = create_operator(db_session, "s@x.com", "S", "admin", password="orig-pw-1")
set_operator_active(db_session, user, False)
assert user.active is False
set_operator_role(db_session, user, "superadmin")
assert user.role == "superadmin"
with pytest.raises(ValueError):
set_operator_role(db_session, user, "wizard")
+49
View File
@@ -0,0 +1,49 @@
# tests/test_operator_cookies.py
import time
import base64
import json
from backend.auth_cookies import sign, read
def test_sign_then_read_round_trips():
now = int(time.time())
raw = sign({"uid": "abc", "iat": now})
data = read(raw, max_age=3600)
assert data == {"uid": "abc", "iat": now}
def test_tampered_signature_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time())})
body, _sig = raw.rsplit(".", 1)
assert read(body + ".deadbeef", max_age=3600) is None
def test_tampered_body_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time())})
body, sig = raw.rsplit(".", 1)
forged = base64.urlsafe_b64encode(json.dumps({"uid": "evil", "iat": int(time.time())}).encode()).decode()
assert read(forged + "." + sig, max_age=3600) is None
def test_expired_by_iat_is_rejected():
raw = sign({"uid": "abc", "iat": int(time.time()) - 10_000})
assert read(raw, max_age=3600) is None
def test_garbage_input_is_none_not_raise():
assert read("not-a-cookie", max_age=3600) is None
assert read("", max_age=3600) is None
assert read(None, max_age=3600) is None
def test_wrong_secret_is_rejected(monkeypatch):
import backend.auth_cookies as ac
monkeypatch.setattr(ac, "SECRET_KEY", "secret-A")
raw = ac.sign({"uid": "x", "iat": int(time.time())})
monkeypatch.setattr(ac, "SECRET_KEY", "secret-B")
assert ac.read(raw, max_age=3600) is None
def test_future_dated_iat_is_rejected():
raw = sign({"uid": "x", "iat": int(time.time()) + 10_000})
assert read(raw, max_age=3600) is None
+76
View File
@@ -0,0 +1,76 @@
# tests/test_operator_gate.py
import uuid
from tests.conftest import wire_operator_auth
from backend.models import OperatorUser
from backend.operator_auth import make_operator_cookie, COOKIE_NAME
from backend.auth_passwords import hash_password
def _make_user(db, role="admin", **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash=hash_password("pw"), role=role, **kw)
db.add(u)
db.commit()
return u
def test_flag_off_passes_everything(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=False)
assert client.get("/", follow_redirects=False).status_code == 200
def test_gated_html_redirects_to_login_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"].startswith("/login?next=")
def test_gated_api_returns_401_json_when_unauth(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/api/status-snapshot", follow_redirects=False)
assert r.status_code == 401
def test_valid_session_passes(client, db_session, monkeypatch):
u = _make_user(db_session)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
assert client.get("/", follow_redirects=False).status_code == 200
def test_must_change_password_user_routed_to_change_password(client, db_session, monkeypatch):
u = _make_user(db_session, must_change_password=True)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
r = client.get("/", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_exempt_paths_pass_without_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
assert client.get("/health", follow_redirects=False).status_code == 200
def test_portal_paths_are_exempt(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# /portal/p/<bad> hits the portal's own gate (403/404), never the operator login.
r = client.get("/portal/p/nope", follow_redirects=False)
assert r.status_code in (403, 404)
def test_must_change_user_on_api_gets_403_json_not_redirect(client, db_session, monkeypatch):
u = _make_user(db_session, must_change_password=True)
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(u.id))
r = client.get("/api/status-snapshot", follow_redirects=False)
assert r.status_code == 403
assert r.json()["detail"] == "Password change required"
def test_options_preflight_passes_through_gate(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# CORS preflight has no cookie; the gate must not 303/401 it.
r = client.options("/api/status-snapshot", follow_redirects=False)
assert r.status_code not in (303, 401)
+98
View File
@@ -0,0 +1,98 @@
# tests/test_operator_login.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import (
create_operator, make_operator_cookie, COOKIE_NAME, MAX_LOGIN_FAILURES,
)
def test_login_page_renders(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/login")
assert r.status_code == 200
assert "password" in r.text.lower()
def test_login_success_sets_cookie_and_redirects(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/"
assert f"{COOKIE_NAME}=" in r.headers.get("set-cookie", "")
def test_login_honors_next(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login?next=/settings", data={"email": "ok@x.com", "password": "rightpw-9"},
follow_redirects=False)
assert r.headers["location"] == "/settings"
def test_login_wrong_password_no_cookie_generic_error(client, db_session, monkeypatch):
create_operator(db_session, "ok@x.com", "Ok", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "ok@x.com", "password": "nope"},
follow_redirects=False)
assert r.status_code == 200
assert f"{COOKIE_NAME}=" not in r.headers.get("set-cookie", "")
assert "invalid" in r.text.lower()
def test_login_must_change_redirects_to_change_password(client, db_session, monkeypatch):
create_operator(db_session, "new@x.com", "New", "admin") # generated temp → must_change
from backend.models import OperatorUser
user = db_session.query(OperatorUser).filter_by(email="new@x.com").first()
from backend.auth_passwords import hash_password
user.password_hash = hash_password("temp-pw-1")
db_session.commit()
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/login", data={"email": "new@x.com", "password": "temp-pw-1"},
follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/change-password"
def test_login_lockout_message_after_five(client, db_session, monkeypatch):
create_operator(db_session, "lk@x.com", "Lk", "admin", password="rightpw-9")
wire_operator_auth(monkeypatch, db_session, enabled=True)
for _ in range(MAX_LOGIN_FAILURES):
client.post("/login", data={"email": "lk@x.com", "password": "nope"}, follow_redirects=False)
r = client.post("/login", data={"email": "lk@x.com", "password": "rightpw-9"}, follow_redirects=False)
assert r.status_code == 200
assert "too many" in r.text.lower()
def test_logout_clears_cookie(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.get("/logout", follow_redirects=False)
assert r.status_code == 303
assert r.headers["location"] == "/login"
set_cookie = r.headers.get("set-cookie", "").lower()
assert COOKIE_NAME.lower() in set_cookie
assert 'max-age=0' in set_cookie or 'expires=thu, 01 jan 1970' in set_cookie
def test_change_password_self_service(client, db_session, monkeypatch):
user, _ = create_operator(db_session, "c@x.com", "C", "admin", password="orig-pw-1")
wire_operator_auth(monkeypatch, db_session, enabled=True)
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
r = client.post("/change-password",
data={"current_password": "orig-pw-1", "new_password": "brand-new-2",
"confirm_password": "brand-new-2"}, follow_redirects=False)
assert r.status_code == 303
from backend.auth_passwords import verify_password
db_session.refresh(user)
assert verify_password("brand-new-2", user.password_hash)
assert user.must_change_password is False
def test_safe_next_blocks_open_redirect():
from backend.routers.operator_auth_routes import _safe_next
assert _safe_next("//evil.com") == "/"
assert _safe_next("/\\evil.com") == "/" # backslash authority bypass
assert _safe_next("https://evil.com") == "/"
assert _safe_next("") == "/"
assert _safe_next("/settings") == "/settings"
+42
View File
@@ -0,0 +1,42 @@
# tests/test_operator_machine_endpoints.py
from tests.conftest import wire_operator_auth
def test_machine_endpoints_not_blocked_by_gate(client, db_session, monkeypatch):
"""With the gate ON and no cookie, the LAN-only watcher endpoints must reach
their handlers (the gate must never silently break heartbeats). A handler may
return 422 for an empty body — that still proves the gate let it through.
Note: /emitters/report uses a minimal valid body to avoid triggering the
app's validation_exception_handler (which calls await request.body() — a
known deadlock in Starlette 0.27 TestClient when the body is already
consumed). The gate behaviour is identical regardless of body validity.
"""
wire_operator_auth(monkeypatch, db_session, enabled=True)
r = client.post("/api/series3/heartbeat", json={}, follow_redirects=False)
assert r.status_code != 401 # gate would 401 an unauth /api/* route
assert r.status_code != 303
r = client.post("/api/series4/heartbeat", json={}, follow_redirects=False)
assert r.status_code not in (401, 303)
# /emitters/report is a sync endpoint with required Pydantic fields; supply a
# valid body so the validation_exception_handler (which awaits request.body())
# is never triggered — that handler deadlocks the Starlette 0.27 TestClient.
valid_report = {
"unit": "TEST001",
"unit_type": "series3",
"timestamp": "2024-01-01T00:00:00Z",
"file": "test.evt",
"status": "OK",
}
r = client.post("/emitters/report", json=valid_report, follow_redirects=False)
assert r.status_code != 303 # gate would 303 an unauth HTML route
def test_static_assets_exempt(client, db_session, monkeypatch):
wire_operator_auth(monkeypatch, db_session, enabled=True)
# /sw.js and /manifest.json are PWA assets clients fetch pre-login.
assert client.get("/sw.js", follow_redirects=False).status_code in (200, 404)
assert client.get("/sw.js", follow_redirects=False).status_code != 303
+36
View File
@@ -0,0 +1,36 @@
# tests/test_operator_model.py
import uuid
from backend.models import OperatorUser
from backend.operator_auth import role_at_least, _ROLE_RANK
def test_operator_user_defaults(db_session):
u = OperatorUser(id=str(uuid.uuid4()), email="a@x.com", display_name="A",
password_hash="h", role="admin")
db_session.add(u)
db_session.commit()
got = db_session.query(OperatorUser).filter_by(email="a@x.com").first()
assert got.active is True
assert got.must_change_password is False
assert got.failed_login_count == 0
assert got.locked_until is None
assert got.sessions_valid_from is not None
assert got.sessions_valid_from.microsecond == 0 # truncated to whole seconds
def test_email_is_unique(db_session):
for i in range(2):
db_session.add(OperatorUser(id=str(uuid.uuid4()), email="dup@x.com",
display_name="d", password_hash="h", role="admin"))
import pytest
with pytest.raises(Exception):
db_session.commit()
def test_role_ladder():
assert _ROLE_RANK == {"operator": 10, "admin": 20, "superadmin": 30}
assert role_at_least("superadmin", "admin") is True
assert role_at_least("admin", "admin") is True
assert role_at_least("admin", "superadmin") is False
assert role_at_least("operator", "admin") is False
assert role_at_least("nonsense", "admin") is False
+63
View File
@@ -0,0 +1,63 @@
# tests/test_operator_session.py
import time
import uuid
from datetime import datetime, timedelta
from types import SimpleNamespace
from backend.models import OperatorUser
from backend.operator_auth import (
make_operator_cookie, current_operator, COOKIE_NAME,
)
def _make_user(db, **kw):
u = OperatorUser(id=str(uuid.uuid4()), email=kw.pop("email", "u@x.com"),
display_name="U", password_hash="h", role=kw.pop("role", "admin"), **kw)
db.add(u)
db.commit()
return u
def _req(cookie_value):
# current_operator only reads request.cookies — a stub is enough.
return SimpleNamespace(cookies={COOKIE_NAME: cookie_value} if cookie_value else {})
def test_valid_cookie_resolves_user(db_session):
u = _make_user(db_session)
cookie = make_operator_cookie(u.id)
assert current_operator(_req(cookie), db_session).id == u.id
def test_no_or_garbage_cookie_is_none(db_session):
assert current_operator(_req(None), db_session) is None
assert current_operator(_req("garbage"), db_session) is None
def test_inactive_user_is_none(db_session):
u = _make_user(db_session, active=False)
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_locked_user_is_none(db_session):
u = _make_user(db_session, locked_until=datetime.utcnow() + timedelta(minutes=5))
assert current_operator(_req(make_operator_cookie(u.id)), db_session) is None
def test_cookie_older_than_sessions_valid_from_is_none(db_session):
u = _make_user(db_session)
old_iat = int(time.time()) - 1000
cookie = make_operator_cookie(u.id, iat=old_iat)
u.sessions_valid_from = datetime.utcnow()
db_session.commit()
assert current_operator(_req(cookie), db_session) is None
def test_cookie_minted_with_matching_iat_after_bump_still_valid(db_session):
# Guards the change-password race: bump sessions_valid_from to the new cookie's
# exact iat → that fresh cookie must remain valid.
u = _make_user(db_session)
new_iat = int(time.time())
u.sessions_valid_from = datetime.utcfromtimestamp(new_iat)
db_session.commit()
assert current_operator(_req(make_operator_cookie(u.id, iat=new_iat)), db_session).id == u.id
+132
View File
@@ -0,0 +1,132 @@
# tests/test_operator_users.py
import uuid
from tests.conftest import wire_operator_auth
from backend.operator_auth import create_operator, make_operator_cookie, COOKIE_NAME
from backend.models import OperatorUser
def _login_as(client, user):
client.cookies.set(COOKIE_NAME, make_operator_cookie(user.id))
def test_admin_cannot_reach_user_management(client, db_session, monkeypatch):
admin, _ = create_operator(db_session, "admin@x.com", "Admin", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, admin)
assert client.get("/admin/users", follow_redirects=False).status_code == 403
def test_superadmin_sees_user_management(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.get("/admin/users", follow_redirects=False).status_code == 200
def test_superadmin_lists_users_json(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.get("/api/admin/users")
assert r.status_code == 200
emails = [u["email"] for u in r.json()["users"]]
assert "su@x.com" in emails
assert all("password_hash" not in u for u in r.json()["users"]) # never leak hashes
def test_create_user_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post("/api/admin/users",
json={"email": "dad@x.com", "name": "Dad", "role": "admin"})
assert r.status_code == 200
assert len(r.json()["password"]) >= 12
made = db_session.query(OperatorUser).filter_by(email="dad@x.com").first()
assert made.must_change_password is True
def test_reset_password_returns_temp_once(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/reset-password")
assert r.status_code == 200 and len(r.json()["password"]) >= 12
db_session.refresh(target)
assert target.must_change_password is True
def test_disable_and_enable(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 200
db_session.refresh(target); assert target.active is False
assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 200
db_session.refresh(target); assert target.active is True
def test_change_role(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"})
assert r.status_code == 200
db_session.refresh(target); assert target.role == "superadmin"
def test_admin_cannot_reach_json_endpoints(client, db_session, monkeypatch):
admin, _ = create_operator(db_session, "a@x.com", "A", "admin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, admin)
assert client.get("/api/admin/users").status_code == 403
assert client.post("/api/admin/users", json={"email": "x@x.com", "name": "X", "role": "admin"}).status_code == 403
assert client.post(f"/api/admin/users/{target.id}/reset-password").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/disable").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/enable").status_code == 403
assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "superadmin"}).status_code == 403
def test_cannot_disable_own_account(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{su.id}/disable")
assert r.status_code == 400
db_session.refresh(su)
assert su.active is True
def test_cannot_change_own_role(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
r = client.post(f"/api/admin/users/{su.id}/role", json={"role": "admin"})
assert r.status_code == 400
db_session.refresh(su)
assert su.role == "superadmin"
def test_deferred_operator_role_rejected_by_api(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
target, _ = create_operator(db_session, "t@x.com", "T", "admin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=True)
_login_as(client, su)
assert client.post("/api/admin/users", json={"email": "op@x.com", "name": "Op", "role": "operator"}).status_code == 400
assert client.post(f"/api/admin/users/{target.id}/role", json={"role": "operator"}).status_code == 400
def test_admin_surface_404s_when_flag_off(client, db_session, monkeypatch):
su, _ = create_operator(db_session, "su@x.com", "Su", "superadmin", password="pw-123456")
wire_operator_auth(monkeypatch, db_session, enabled=False)
_login_as(client, su)
# With operator auth OFF, the management surface must not exist (404), even
# though require_role passes through — otherwise it'd be world-open.
assert client.get("/admin/users").status_code == 404
assert client.get("/api/admin/users").status_code == 404
assert client.post("/api/admin/users",
json={"email": "x@x.com", "name": "X", "role": "admin"}).status_code == 404