Merge branch 'dev' into feat/ftp-report-pipeline
This commit is contained in:
@@ -0,0 +1,358 @@
|
||||
"""
|
||||
Client portal — read-only, scoped client view (see docs/CLIENT_PORTAL.md).
|
||||
|
||||
M1: a client opens a magic URL (/portal/enter/{token}) which mints a signed
|
||||
session cookie, then sees their locations (overview) and per-location read-only
|
||||
live data sourced from SLMM's cache. Every data route re-checks ownership.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
import httpx
|
||||
import websockets
|
||||
from fastapi import APIRouter, Request, Depends, HTTPException, WebSocket
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from backend.database import get_db, SessionLocal
|
||||
from backend.models import Client, MonitoringLocation, Project, UnitAssignment
|
||||
from backend.templates_config import templates
|
||||
from backend.portal_auth import (
|
||||
get_current_client, client_from_cookie, make_session_cookie, resolve_token,
|
||||
provision_preview_session, PORTAL_OPEN_LINKS,
|
||||
COOKIE_NAME, COOKIE_MAX_AGE,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/portal", tags=["portal"])
|
||||
|
||||
SLMM_BASE_URL = os.getenv("SLMM_BASE_URL", "http://localhost:8100")
|
||||
SLMM_WS_BASE_URL = SLMM_BASE_URL.replace("http://", "ws://").replace("https://", "wss://")
|
||||
|
||||
# Whitelist of fields the portal exposes to a client — sound metrics + run state
|
||||
# only. Internal device health (battery/power/SD/raw_payload) is NOT disclosed.
|
||||
_PORTAL_LIVE_FIELDS = ("measurement_state", "last_seen", "measurement_start_time",
|
||||
"lp", "leq", "lmax", "lpeak", "ln1", "ln2")
|
||||
|
||||
|
||||
# -- scoping (every data route gates through these) --------------------------
|
||||
|
||||
def _client_project_ids(client: Client, db: Session) -> list:
|
||||
return [r[0] for r in db.query(Project.id).filter(
|
||||
Project.client_id == client.id, Project.status != "deleted").all()]
|
||||
|
||||
|
||||
def resolve_client_location(client: Client, location_id: str, db: Session) -> MonitoringLocation:
|
||||
"""Ownership gate: location must be a sound location in one of the client's
|
||||
active projects. Raises 404 (not 403) for both 'missing' and 'not yours' so
|
||||
we never leak whether a location exists."""
|
||||
loc = db.query(MonitoringLocation).filter_by(id=location_id, removed_at=None).first()
|
||||
if (not loc or loc.location_type != "sound"
|
||||
or loc.project_id not in _client_project_ids(client, db)):
|
||||
raise HTTPException(status_code=404, detail="Location not found")
|
||||
return loc
|
||||
|
||||
|
||||
def active_unit_for_location(location_id: str, db: Session):
|
||||
"""The SLM unit currently assigned to this location, or None."""
|
||||
now = datetime.utcnow()
|
||||
asg = (db.query(UnitAssignment)
|
||||
.filter(UnitAssignment.location_id == location_id,
|
||||
UnitAssignment.status == "active",
|
||||
UnitAssignment.device_type == "slm",
|
||||
or_(UnitAssignment.assigned_until.is_(None),
|
||||
UnitAssignment.assigned_until > now))
|
||||
.order_by(UnitAssignment.assigned_at.desc()).first())
|
||||
return asg.unit_id if asg else None
|
||||
|
||||
|
||||
def _client_locations(client: Client, db: Session) -> list:
|
||||
"""The client's active sound locations (for the overview tiles + map)."""
|
||||
pids = _client_project_ids(client, db)
|
||||
if not pids:
|
||||
return []
|
||||
projs = {p.id: p.name for p in
|
||||
db.query(Project.id, Project.name).filter(Project.id.in_(pids)).all()}
|
||||
locs = (db.query(MonitoringLocation)
|
||||
.filter(MonitoringLocation.project_id.in_(pids),
|
||||
MonitoringLocation.location_type == "sound",
|
||||
MonitoringLocation.removed_at.is_(None))
|
||||
.order_by(MonitoringLocation.sort_order, MonitoringLocation.name).all())
|
||||
return [{
|
||||
"id": loc.id, "name": loc.name,
|
||||
"address": loc.address, "coordinates": loc.coordinates,
|
||||
"project_name": projs.get(loc.project_id),
|
||||
"has_device": active_unit_for_location(loc.id, db) is not None,
|
||||
} for loc in locs]
|
||||
|
||||
|
||||
@router.get("/enter/{token}")
|
||||
def portal_enter(token: str, request: Request, db: Session = Depends(get_db)):
|
||||
"""Magic-URL entry: validate the token, mint a session cookie, land on /portal."""
|
||||
tok, client = resolve_token(token, db)
|
||||
if not client:
|
||||
return templates.TemplateResponse(
|
||||
"portal/access_required.html",
|
||||
{"request": request, "reason": "invalid"},
|
||||
status_code=403,
|
||||
)
|
||||
resp = RedirectResponse(url="/portal", status_code=303)
|
||||
resp.set_cookie(
|
||||
COOKIE_NAME, make_session_cookie(tok.id),
|
||||
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax",
|
||||
)
|
||||
logger.info(f"[PORTAL] {client.slug}: session opened via token {tok.id[:8]}")
|
||||
return resp
|
||||
|
||||
|
||||
@router.get("/open/{project_id}")
|
||||
def portal_open(project_id: str, request: Request, db: Session = Depends(get_db)):
|
||||
"""Dev-only plain shareable link: open a project's client portal with no token
|
||||
(gated by PORTAL_OPEN_LINKS). Lets anyone with the URL view it for feedback —
|
||||
sets the session cookie and lands on /portal. Lives under /portal so it works
|
||||
through a reverse proxy that exposes only /portal/*."""
|
||||
if not PORTAL_OPEN_LINKS:
|
||||
return templates.TemplateResponse(
|
||||
"portal/access_required.html", {"request": request, "reason": "required"},
|
||||
status_code=404)
|
||||
project = db.query(Project).filter_by(id=project_id).first()
|
||||
if not project:
|
||||
return templates.TemplateResponse(
|
||||
"portal/access_required.html", {"request": request, "reason": "invalid"},
|
||||
status_code=404)
|
||||
token_id = provision_preview_session(project, db)
|
||||
resp = RedirectResponse(url="/portal", status_code=303)
|
||||
resp.set_cookie(COOKIE_NAME, make_session_cookie(token_id),
|
||||
max_age=COOKIE_MAX_AGE, httponly=True, samesite="lax")
|
||||
return resp
|
||||
|
||||
|
||||
@router.get("/logout")
|
||||
def portal_logout():
|
||||
resp = RedirectResponse(url="/portal/access", status_code=303)
|
||||
resp.delete_cookie(COOKIE_NAME)
|
||||
return resp
|
||||
|
||||
|
||||
@router.get("/access")
|
||||
def portal_access(request: Request):
|
||||
"""Landing for an unauthenticated visitor (no valid link)."""
|
||||
return templates.TemplateResponse(
|
||||
"portal/access_required.html", {"request": request, "reason": "required"}
|
||||
)
|
||||
|
||||
|
||||
@router.get("")
|
||||
def portal_home(request: Request, client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""Client overview — their active sound locations with live tiles + a map."""
|
||||
return templates.TemplateResponse(
|
||||
"portal/overview.html",
|
||||
{"request": request, "client": client,
|
||||
"locations": _client_locations(client, db)},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/location/{location_id}")
|
||||
def portal_location(location_id: str, request: Request,
|
||||
client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""Read-only live view for one of the client's locations (404 if not owned)."""
|
||||
loc = resolve_client_location(client, location_id, db)
|
||||
return templates.TemplateResponse("portal/location.html", {
|
||||
"request": request, "client": client, "location": loc,
|
||||
"has_device": active_unit_for_location(location_id, db) is not None,
|
||||
})
|
||||
|
||||
|
||||
# -- scoped data (cache reads only — never hits the device) ------------------
|
||||
|
||||
@router.get("/api/location/{location_id}/live")
|
||||
async def portal_location_live(location_id: str,
|
||||
client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""Scrubbed cached live reading for a location the client owns."""
|
||||
resolve_client_location(client, location_id, db)
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
if not unit_id:
|
||||
return {"status": "ok", "data": None, "reason": "no_device"}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as hc:
|
||||
r = await hc.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/status")
|
||||
except Exception:
|
||||
return {"status": "ok", "data": None, "reason": "unreachable"}
|
||||
if r.status_code != 200:
|
||||
return {"status": "ok", "data": None, "reason": "no_data"}
|
||||
full = (r.json() or {}).get("data", {}) or {}
|
||||
return {"status": "ok", "data": {k: full.get(k) for k in _PORTAL_LIVE_FIELDS}}
|
||||
|
||||
|
||||
@router.get("/api/location/{location_id}/history")
|
||||
async def portal_location_history(location_id: str, hours: float = 2.0,
|
||||
client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""Cached chart trail for a location the client owns. (Trail rows are already
|
||||
just timestamp + lp/leq/lmax/ln1/ln2 — safe to pass through.)"""
|
||||
resolve_client_location(client, location_id, db)
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
if not unit_id:
|
||||
return {"status": "ok", "readings": []}
|
||||
hours = max(0.1, min(hours, 48.0))
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as hc:
|
||||
r = await hc.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/history",
|
||||
params={"hours": hours})
|
||||
except Exception:
|
||||
return {"status": "ok", "readings": []}
|
||||
if r.status_code != 200:
|
||||
return {"status": "ok", "readings": []}
|
||||
raw = (r.json() or {}).get("readings", [])
|
||||
fields = ("timestamp", "lp", "leq", "lmax", "ln1", "ln2") # whitelist, like the other endpoints
|
||||
return {"status": "ok", "readings": [{k: x.get(k) for k in fields} for x in raw]}
|
||||
|
||||
|
||||
# Whitelist of alert-event fields exposed to a client (no internal ids/ack-by).
|
||||
_PORTAL_EVENT_FIELDS = ("rule_name", "metric", "threshold_db", "onset_at",
|
||||
"onset_value", "peak_value", "clear_at", "status")
|
||||
|
||||
|
||||
@router.get("/api/location/{location_id}/events")
|
||||
async def portal_location_events(location_id: str, limit: int = 20,
|
||||
client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""Scrubbed breach history for a location the client owns (read-only)."""
|
||||
resolve_client_location(client, location_id, db)
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
if not unit_id:
|
||||
return {"status": "ok", "events": []}
|
||||
limit = max(1, min(limit, 100))
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as hc:
|
||||
r = await hc.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/alerts/events",
|
||||
params={"limit": limit})
|
||||
except Exception:
|
||||
return {"status": "ok", "events": []}
|
||||
if r.status_code != 200:
|
||||
return {"status": "ok", "events": []}
|
||||
raw = (r.json() or {}).get("events", [])
|
||||
events = [{k: e.get(k) for k in _PORTAL_EVENT_FIELDS} for e in raw]
|
||||
return {"status": "ok", "events": events, "active": sum(1 for e in events if e.get("status") == "active")}
|
||||
|
||||
|
||||
# Whitelist of alert-rule fields shown to a client (the active limits, no cooldown/
|
||||
# hysteresis internals).
|
||||
_PORTAL_RULE_FIELDS = ("name", "metric", "comparison", "threshold_db", "duration_s",
|
||||
"schedule_start", "schedule_end", "schedule_days")
|
||||
|
||||
|
||||
@router.get("/api/location/{location_id}/thresholds")
|
||||
async def portal_location_thresholds(location_id: str,
|
||||
client: Client = Depends(get_current_client),
|
||||
db: Session = Depends(get_db)):
|
||||
"""The active alert limits for a location the client owns (enabled rules only),
|
||||
so the client can see what they're being alerted on. Read-only, scrubbed."""
|
||||
resolve_client_location(client, location_id, db)
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
if not unit_id:
|
||||
return {"status": "ok", "rules": []}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as hc:
|
||||
r = await hc.get(f"{SLMM_BASE_URL}/api/nl43/{unit_id}/alerts/rules")
|
||||
except Exception:
|
||||
return {"status": "ok", "rules": []}
|
||||
if r.status_code != 200:
|
||||
return {"status": "ok", "rules": []}
|
||||
raw = (r.json() or {}).get("rules", [])
|
||||
rules = [{k: x.get(k) for k in _PORTAL_RULE_FIELDS} for x in raw if x.get("enabled")]
|
||||
return {"status": "ok", "rules": rules}
|
||||
|
||||
|
||||
# -- live stream (fan-out feed, scoped + scrubbed) ---------------------------
|
||||
|
||||
def _scrub_frame(raw: str):
|
||||
"""Project a monitor frame down to the portal whitelist. Drops internal fields
|
||||
(unit_id, raw_payload, lmin) before it reaches a client; passes control fields
|
||||
(feed_status, heartbeat) + timestamp through. Returns None for a non-JSON frame
|
||||
so the caller drops it rather than forwarding anything unscrubbed."""
|
||||
try:
|
||||
d = json.loads(raw)
|
||||
except Exception:
|
||||
return None
|
||||
out = {k: d.get(k) for k in _PORTAL_LIVE_FIELDS if k in d}
|
||||
if "timestamp" in d:
|
||||
out["timestamp"] = d["timestamp"]
|
||||
for ctrl in ("feed_status", "heartbeat"):
|
||||
if ctrl in d:
|
||||
out[ctrl] = d[ctrl]
|
||||
return json.dumps(out)
|
||||
|
||||
|
||||
@router.websocket("/api/location/{location_id}/stream")
|
||||
async def portal_location_stream(websocket: WebSocket, location_id: str):
|
||||
"""Live ~1Hz feed for a location the client owns. Auths via the session cookie,
|
||||
enforces ownership, then bridges the unit's shared SLMM /monitor fan-out feed
|
||||
to the browser (scrubbed). A viewer is just one more subscriber to the one
|
||||
device feed — no extra device connection."""
|
||||
await websocket.accept()
|
||||
|
||||
# Auth + ownership on a short-lived session, then release it for the long bridge.
|
||||
db = SessionLocal()
|
||||
try:
|
||||
client = client_from_cookie(websocket.cookies.get(COOKIE_NAME), db)
|
||||
if client is None:
|
||||
await websocket.close(code=1008) # policy violation (not authenticated)
|
||||
return
|
||||
try:
|
||||
resolve_client_location(client, location_id, db)
|
||||
except HTTPException:
|
||||
await websocket.close(code=1008)
|
||||
return
|
||||
unit_id = active_unit_for_location(location_id, db)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
if not unit_id:
|
||||
try:
|
||||
await websocket.send_json({"feed_status": "no_device"})
|
||||
finally:
|
||||
await websocket.close(code=1000)
|
||||
return
|
||||
|
||||
target = f"{SLMM_WS_BASE_URL}/api/nl43/{unit_id}/monitor"
|
||||
backend_ws = None
|
||||
try:
|
||||
backend_ws = await websockets.connect(target)
|
||||
|
||||
async def forward_to_client():
|
||||
async for message in backend_ws:
|
||||
frame = _scrub_frame(message)
|
||||
if frame is not None:
|
||||
await websocket.send_text(frame)
|
||||
|
||||
async def watch_client():
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
|
||||
tasks = [asyncio.ensure_future(forward_to_client()),
|
||||
asyncio.ensure_future(watch_client())]
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
for t in tasks:
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning(f"[PORTAL] stream {location_id}: {e}")
|
||||
finally:
|
||||
if backend_ws:
|
||||
try:
|
||||
await backend_ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user