diff --git a/backend/portal_auth.py b/backend/portal_auth.py index da869a2..ac52ae7 100644 --- a/backend/portal_auth.py +++ b/backend/portal_auth.py @@ -82,20 +82,24 @@ def _read_session_cookie(value: str): # -- the dependency every portal route uses --------------------------------- -def get_current_client(request: Request, db: Session = Depends(get_db)) -> Client: - """Resolve the authenticated client, or raise PortalAuthError. - - Re-validates the access token on every request so a revoked link / disabled - client drops the session immediately.""" - cookie = request.cookies.get(COOKIE_NAME) - token_id = _read_session_cookie(cookie) if cookie else None +def client_from_cookie(cookie_value, db: Session): + """Resolve a Client from a raw session-cookie value, or None. Re-validates the + access token against the DB each call, so a revoked link / disabled client + drops immediately. Shared by the HTTP dependency and the WebSocket handler + (which can't use Request-based Depends).""" + token_id = _read_session_cookie(cookie_value) if cookie_value else None if not token_id: - raise PortalAuthError() + return None tok = db.query(ClientAccessToken).filter_by(id=token_id, revoked_at=None).first() if not tok: - raise PortalAuthError() - client = db.query(Client).filter_by(id=tok.client_id, active=True).first() - if not client: + return None + return db.query(Client).filter_by(id=tok.client_id, active=True).first() + + +def get_current_client(request: Request, db: Session = Depends(get_db)) -> Client: + """Resolve the authenticated client, or raise PortalAuthError.""" + client = client_from_cookie(request.cookies.get(COOKIE_NAME), db) + if client is None: raise PortalAuthError() return client diff --git a/backend/routers/portal.py b/backend/routers/portal.py index 1382b45..abbea7b 100644 --- a/backend/routers/portal.py +++ b/backend/routers/portal.py @@ -7,20 +7,23 @@ live data sourced from SLMM's cache. Every data route re-checks ownership. """ import os +import json +import asyncio import logging from datetime import datetime import httpx -from fastapi import APIRouter, Request, Depends, HTTPException +import websockets +from fastapi import APIRouter, Request, Depends, HTTPException, WebSocket from fastapi.responses import RedirectResponse from sqlalchemy import or_ from sqlalchemy.orm import Session -from backend.database import get_db +from backend.database import get_db, SessionLocal from backend.models import Client, MonitoringLocation, Project, UnitAssignment from backend.templates_config import templates from backend.portal_auth import ( - get_current_client, make_session_cookie, resolve_token, + get_current_client, client_from_cookie, make_session_cookie, resolve_token, COOKIE_NAME, COOKIE_MAX_AGE, ) @@ -28,6 +31,7 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/portal", tags=["portal"]) SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100") +SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://") # Whitelist of fields the portal exposes to a client — sound metrics + run state # only. Internal device health (battery/power/SD/raw_payload) is NOT disclosed. @@ -185,3 +189,86 @@ async def portal_location_history(location_id: str, hours: float = 2.0, if r.status_code != 200: return {"status": "ok", "readings": []} return {"status": "ok", "readings": (r.json() or {}).get("readings", [])} + + +# -- live stream (fan-out feed, scoped + scrubbed) --------------------------- + +def _scrub_frame(raw: str) -> str: + """Project a monitor frame down to the portal whitelist. Drops internal fields + (unit_id, raw_payload, lmin) before it reaches a client; passes control fields + (feed_status, heartbeat) + timestamp through.""" + try: + d = json.loads(raw) + except Exception: + return raw + out = {k: d.get(k) for k in _PORTAL_LIVE_FIELDS if k in d} + if "timestamp" in d: + out["timestamp"] = d["timestamp"] + for ctrl in ("feed_status", "heartbeat"): + if ctrl in d: + out[ctrl] = d[ctrl] + return json.dumps(out) + + +@router.websocket("/api/location/{location_id}/stream") +async def portal_location_stream(websocket: WebSocket, location_id: str): + """Live ~1Hz feed for a location the client owns. Auths via the session cookie, + enforces ownership, then bridges the unit's shared SLMM /monitor fan-out feed + to the browser (scrubbed). A viewer is just one more subscriber to the one + device feed — no extra device connection.""" + await websocket.accept() + + # Auth + ownership on a short-lived session, then release it for the long bridge. + db = SessionLocal() + try: + client = client_from_cookie(websocket.cookies.get(COOKIE_NAME), db) + if client is None: + await websocket.close(code=1008) # policy violation (not authenticated) + return + try: + resolve_client_location(client, location_id, db) + except HTTPException: + await websocket.close(code=1008) + return + unit_id = active_unit_for_location(location_id, db) + finally: + db.close() + + if not unit_id: + try: + await websocket.send_json({"feed_status": "no_device"}) + finally: + await websocket.close(code=1000) + return + + target = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor" + backend_ws = None + try: + backend_ws = await websockets.connect(target) + + async def forward_to_client(): + async for message in backend_ws: + await websocket.send_text(_scrub_frame(message)) + + async def watch_client(): + while True: + await websocket.receive_text() + + tasks = [asyncio.ensure_future(forward_to_client()), + asyncio.ensure_future(watch_client())] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + for t in pending: + t.cancel() + for t in tasks: + try: + await t + except (asyncio.CancelledError, Exception): + pass + except Exception as e: + logger.warning(f"[PORTAL] stream {location_id}: {e}") + finally: + if backend_ws: + try: + await backend_ws.close() + except Exception: + pass diff --git a/templates/portal/location.html b/templates/portal/location.html index 4bb107b..1a3148c 100644 --- a/templates/portal/location.html +++ b/templates/portal/location.html @@ -39,8 +39,14 @@ -