""" Client-portal auth — the swappable gate (see docs/CLIENT_PORTAL.md). M1-M3 ride on an interim signed "magic URL": an unguessable token in the link mints a signed session cookie. Every portal route depends on get_current_client(); M4 replaces the backing (magic-link / accounts) without touching routes/templates. The cookie carries the ACCESS-TOKEN id (not the client id) and is re-validated against the DB on every request, so revoking a link (revoked_at) kills its live sessions on the next request — not just future clicks. No new dependency: the cookie is signed with stdlib HMAC-SHA256 over a SECRET_KEY. """ import os import hmac import json import time import base64 import hashlib import logging from datetime import datetime from fastapi import Request, Depends from sqlalchemy.orm import Session from backend.database import get_db from backend.models import Client, ClientAccessToken logger = logging.getLogger(__name__) # Signing secret for portal session cookies. MUST be set to a real secret in prod # (env). The insecure default only exists so dev/test boots without config. SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-change-me") if SECRET_KEY == "dev-insecure-change-me": logger.warning("[PORTAL] SECRET_KEY is the insecure default — set SECRET_KEY in prod.") COOKIE_NAME = "portal_session" COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days class PortalAuthError(Exception): """Raised by get_current_client when there's no valid portal session. Handled centrally in main.py: HTML routes get the access-required page, /portal/api/* routes get a 401 JSON.""" # -- token + cookie primitives ---------------------------------------------- def hash_token(raw: str) -> str: """sha256 hex of a raw access-token secret (what we store + look up by).""" return hashlib.sha256(raw.encode()).hexdigest() def _sign(body: str) -> str: return hmac.new(SECRET_KEY.encode(), body.encode(), hashlib.sha256).hexdigest() def make_session_cookie(token_id: str) -> str: body = base64.urlsafe_b64encode( json.dumps({"tid": token_id, "iat": int(time.time())}).encode() ).decode() return f"{body}.{_sign(body)}" def _read_session_cookie(value: str): """Return the token id from a signed cookie, or None if missing/tampered.""" try: body, sig = value.rsplit(".", 1) except (ValueError, AttributeError): return None if not hmac.compare_digest(sig, _sign(body)): return None try: data = json.loads(base64.urlsafe_b64decode(body.encode())) except Exception: return None return data.get("tid") # -- the dependency every portal route uses --------------------------------- def get_current_client(request: Request, db: Session = Depends(get_db)) -> Client: """Resolve the authenticated client, or raise PortalAuthError. Re-validates the access token on every request so a revoked link / disabled client drops the session immediately.""" cookie = request.cookies.get(COOKIE_NAME) token_id = _read_session_cookie(cookie) if cookie else None if not token_id: raise PortalAuthError() tok = db.query(ClientAccessToken).filter_by(id=token_id, revoked_at=None).first() if not tok: raise PortalAuthError() client = db.query(Client).filter_by(id=tok.client_id, active=True).first() if not client: raise PortalAuthError() return client def resolve_token(raw_token: str, db: Session): """Validate a raw magic-URL token. Returns (ClientAccessToken, Client) on success, or (None, None). Also stamps last_used_at.""" tok = db.query(ClientAccessToken).filter_by( token_hash=hash_token(raw_token), revoked_at=None ).first() if not tok: return None, None client = db.query(Client).filter_by(id=tok.client_id, active=True).first() if not client: return None, None tok.last_used_at = datetime.utcnow() db.commit() return tok, client