feat(portal): live ~1Hz WS stream with auto-close (visibility + idle cap)
The portal location view is now genuinely live, not a 15s poll. Scoped WS endpoint
/portal/api/location/{id}/stream: authenticates via the session cookie, enforces
ownership (resolve_client_location), then bridges the unit's shared SLMM /monitor
fan-out feed to the browser — a viewer is just one more subscriber, no extra
device connection. Frames are scrubbed to the portal whitelist (drops unit_id,
raw_payload, counter, lmin) before reaching the client.
location.html: cache prefill for instant first paint, then upgrades to the live
socket (cards tick ~1Hz, chart scrolls). Auto-close so an abandoned tab can't pin
the device at 1Hz polling (~8x cellular data):
- closes when the tab is hidden, reopens when visible (Page Visibility) — the main
guard;
- hard 15-min cap -> "Live paused — click to resume" overlay.
Refactor: client_from_cookie() extracted from get_current_client so the WS handler
(no Request-based Depends) can auth the same way.
Verified: scrub drops internal fields / keeps metrics + heartbeat (7/7), auth
refactor (3/3), portal compiles, location.html JS balances + parses.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+15
-11
@@ -82,20 +82,24 @@ def _read_session_cookie(value: str):
|
||||
|
||||
# -- the dependency every portal route uses ---------------------------------
|
||||
|
||||
def get_current_client(request: Request, db: Session = Depends(get_db)) -> Client:
|
||||
"""Resolve the authenticated client, or raise PortalAuthError.
|
||||
|
||||
Re-validates the access token on every request so a revoked link / disabled
|
||||
client drops the session immediately."""
|
||||
cookie = request.cookies.get(COOKIE_NAME)
|
||||
token_id = _read_session_cookie(cookie) if cookie else None
|
||||
def client_from_cookie(cookie_value, db: Session):
|
||||
"""Resolve a Client from a raw session-cookie value, or None. Re-validates the
|
||||
access token against the DB each call, so a revoked link / disabled client
|
||||
drops immediately. Shared by the HTTP dependency and the WebSocket handler
|
||||
(which can't use Request-based Depends)."""
|
||||
token_id = _read_session_cookie(cookie_value) if cookie_value else None
|
||||
if not token_id:
|
||||
raise PortalAuthError()
|
||||
return None
|
||||
tok = db.query(ClientAccessToken).filter_by(id=token_id, revoked_at=None).first()
|
||||
if not tok:
|
||||
raise PortalAuthError()
|
||||
client = db.query(Client).filter_by(id=tok.client_id, active=True).first()
|
||||
if not client:
|
||||
return None
|
||||
return db.query(Client).filter_by(id=tok.client_id, active=True).first()
|
||||
|
||||
|
||||
def get_current_client(request: Request, db: Session = Depends(get_db)) -> Client:
|
||||
"""Resolve the authenticated client, or raise PortalAuthError."""
|
||||
client = client_from_cookie(request.cookies.get(COOKIE_NAME), db)
|
||||
if client is None:
|
||||
raise PortalAuthError()
|
||||
return client
|
||||
|
||||
|
||||
@@ -7,20 +7,23 @@ live data sourced from SLMM's cache. Every data route re-checks ownership.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Request, Depends, HTTPException
|
||||
import websockets
|
||||
from fastapi import APIRouter, Request, Depends, HTTPException, WebSocket
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.database import get_db, SessionLocal
|
||||
from backend.models import Client, MonitoringLocation, Project, UnitAssignment
|
||||
from backend.templates_config import templates
|
||||
from backend.portal_auth import (
|
||||
get_current_client, make_session_cookie, resolve_token,
|
||||
get_current_client, client_from_cookie, make_session_cookie, resolve_token,
|
||||
COOKIE_NAME, COOKIE_MAX_AGE,
|
||||
)
|
||||
|
||||
@@ -28,6 +31,7 @@ logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/portal", tags=["portal"])
|
||||
|
||||
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://")
|
||||
|
||||
# Whitelist of fields the portal exposes to a client — sound metrics + run state
|
||||
# only. Internal device health (battery/power/SD/raw_payload) is NOT disclosed.
|
||||
@@ -185,3 +189,86 @@ async def portal_location_history(location_id: str, hours: float = 2.0,
|
||||
if r.status_code != 200:
|
||||
return {"status": "ok", "readings": []}
|
||||
return {"status": "ok", "readings": (r.json() or {}).get("readings", [])}
|
||||
|
||||
|
||||
# -- live stream (fan-out feed, scoped + scrubbed) ---------------------------
|
||||
|
||||
def _scrub_frame(raw: str) -> str:
|
||||
"""Project a monitor frame down to the portal whitelist. Drops internal fields
|
||||
(unit_id, raw_payload, lmin) before it reaches a client; passes control fields
|
||||
(feed_status, heartbeat) + timestamp through."""
|
||||
try:
|
||||
d = json.loads(raw)
|
||||
except Exception:
|
||||
return raw
|
||||
out = {k: d.get(k) for k in _PORTAL_LIVE_FIELDS if k in d}
|
||||
if "timestamp" in d:
|
||||
out["timestamp"] = d["timestamp"]
|
||||
for ctrl in ("feed_status", "heartbeat"):
|
||||
if ctrl in d:
|
||||
out[ctrl] = d[ctrl]
|
||||
return json.dumps(out)
|
||||
|
||||
|
||||
@router.websocket("/api/location/{location_id}/stream")
|
||||
async def portal_location_stream(websocket: WebSocket, location_id: str):
|
||||
"""Live ~1Hz feed for a location the client owns. Auths via the session cookie,
|
||||
enforces ownership, then bridges the unit's shared SLMM /monitor fan-out feed
|
||||
to the browser (scrubbed). A viewer is just one more subscriber to the one
|
||||
device feed — no extra device connection."""
|
||||
await websocket.accept()
|
||||
|
||||
# Auth + ownership on a short-lived session, then release it for the long bridge.
|
||||
db = SessionLocal()
|
||||
try:
|
||||
client = client_from_cookie(websocket.cookies.get(COOKIE_NAME), db)
|
||||
if client is None:
|
||||
await websocket.close(code=1008) # policy violation (not authenticated)
|
||||
return
|
||||
try:
|
||||
resolve_client_location(client, location_id, db)
|
||||
except HTTPException:
|
||||
await websocket.close(code=1008)
|
||||
return
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
if not unit_id:
|
||||
try:
|
||||
await websocket.send_json({"feed_status": "no_device"})
|
||||
finally:
|
||||
await websocket.close(code=1000)
|
||||
return
|
||||
|
||||
target = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor"
|
||||
backend_ws = None
|
||||
try:
|
||||
backend_ws = await websockets.connect(target)
|
||||
|
||||
async def forward_to_client():
|
||||
async for message in backend_ws:
|
||||
await websocket.send_text(_scrub_frame(message))
|
||||
|
||||
async def watch_client():
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
|
||||
tasks = [asyncio.ensure_future(forward_to_client()),
|
||||
asyncio.ensure_future(watch_client())]
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
for t in tasks:
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning(f"[PORTAL] stream {location_id}: {e}")
|
||||
finally:
|
||||
if backend_ws:
|
||||
try:
|
||||
await backend_ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -39,8 +39,14 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="rounded-xl border border-slate-700 bg-slate-800/50 p-4" style="min-height: 360px;">
|
||||
<div class="relative rounded-xl border border-slate-700 bg-slate-800/50 p-4" style="min-height: 360px;">
|
||||
<canvas id="p-chart"></canvas>
|
||||
<div id="p-paused" class="hidden absolute inset-0 flex items-center justify-center bg-slate-900/70 rounded-xl">
|
||||
<button onclick="resumeStream()"
|
||||
class="px-4 py-2 rounded-lg bg-seismo-orange/20 text-seismo-orange border border-seismo-orange/40 hover:bg-seismo-orange/30 text-sm font-medium">
|
||||
⏸ Live paused — click to resume
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
@@ -129,10 +135,78 @@ async function backfill() {
|
||||
} catch (e) { /* leave chart empty */ }
|
||||
}
|
||||
|
||||
// ---- live stream (upgrades the cache prefill to a real ~1Hz feed) --------
|
||||
let ws = null, hardCap = null, paused = false;
|
||||
const IDLE_CAP_MS = 15 * 60 * 1000; // auto-close after 15 min so an abandoned
|
||||
// tab doesn't pin the device at 1Hz polling
|
||||
|
||||
function pushPoint(d) {
|
||||
cd.t.push(new Date().toLocaleTimeString());
|
||||
cd.lp.push(numOrNull(d.lp)); cd.leq.push(numOrNull(d.leq));
|
||||
cd.ln1.push(numOrNull(d.ln1)); cd.ln2.push(numOrNull(d.ln2));
|
||||
if (cd.t.length > 600) { cd.t.shift(); cd.lp.shift(); cd.leq.shift(); cd.ln1.shift(); cd.ln2.shift(); }
|
||||
chart.data.labels = cd.t;
|
||||
chart.data.datasets[0].data = cd.lp; chart.data.datasets[1].data = cd.leq;
|
||||
chart.data.datasets[2].data = cd.ln1; chart.data.datasets[3].data = cd.ln2;
|
||||
chart.update('none');
|
||||
}
|
||||
|
||||
function openStream() {
|
||||
if (paused || ws) return;
|
||||
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
ws = new WebSocket(`${proto}//${location.host}/portal/api/location/${encodeURIComponent(LOC_ID)}/stream`);
|
||||
ws.onmessage = (e) => {
|
||||
let d; try { d = JSON.parse(e.data); } catch (_) { return; }
|
||||
if (d.feed_status === 'no_device') {
|
||||
setBadge(null, null);
|
||||
document.getElementById('p-fresh').textContent = 'No device assigned';
|
||||
return;
|
||||
}
|
||||
if (d.heartbeat) return;
|
||||
if (d.feed_status === 'unreachable') {
|
||||
document.getElementById('p-fresh').innerHTML = '<span class="text-amber-400">device unreachable</span>';
|
||||
return;
|
||||
}
|
||||
setCard('p-lp', d.lp); setCard('p-leq', d.leq); setCard('p-lmax', d.lmax);
|
||||
setCard('p-ln1', d.ln1); setCard('p-ln2', d.ln2);
|
||||
const measuring = d.measurement_state === 'Start' || d.measurement_state === 'Measure';
|
||||
setBadge(measuring, d.timestamp || new Date().toISOString());
|
||||
pushPoint(d);
|
||||
};
|
||||
ws.onclose = () => { ws = null; };
|
||||
ws.onerror = () => {};
|
||||
clearTimeout(hardCap);
|
||||
hardCap = setTimeout(() => { paused = true; closeStream(); showPaused(true); }, IDLE_CAP_MS);
|
||||
}
|
||||
|
||||
function closeStream() {
|
||||
clearTimeout(hardCap);
|
||||
if (ws) { try { ws.close(); } catch (_) {} ws = null; }
|
||||
}
|
||||
|
||||
function showPaused(on) {
|
||||
const el = document.getElementById('p-paused');
|
||||
if (el) el.classList.toggle('hidden', !on);
|
||||
}
|
||||
function resumeStream() {
|
||||
paused = false; showPaused(false);
|
||||
prefill(); // refresh cards instantly on resume
|
||||
openStream();
|
||||
}
|
||||
|
||||
// Stop streaming when the tab is hidden (client switched away / locked phone) and
|
||||
// resume when it's visible again — the main cost guard, so the device relaxes back
|
||||
// to its idle poll rate the moment nobody is actually looking.
|
||||
document.addEventListener('visibilitychange', () => {
|
||||
if (document.hidden) closeStream();
|
||||
else if (!paused) openStream();
|
||||
});
|
||||
window.addEventListener('beforeunload', closeStream);
|
||||
|
||||
initChart();
|
||||
prefill();
|
||||
backfill();
|
||||
setInterval(prefill, 15000); // cache poll — read-only, no device contention
|
||||
prefill(); // instant first paint from cache
|
||||
backfill(); // seed the chart trail
|
||||
openStream(); // then upgrade to the live feed
|
||||
</script>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
Reference in New Issue
Block a user