Files
terra-view/backend/routers/metadata_backfill.py
serversdown d3b5a3fd26 feat(sfm): inline typeahead override of project + location on each cluster card
Operator no longer has to accept the parser's suggested project /
location verbatim.  Each cluster card now has editable typeahead inputs
that search existing projects (and existing locations within the chosen
project), with a "Create new: <typed>" fallback always available.

Solves the I-80-North-Fork case: of the 20+ cluster variants
("I-80-North Fork Bridges-I80 E. Abutment", "I-80- North Fork
Bridges-543 Plank Rd", etc.), operator types "I-80" in the Project
input, picks the existing project from the dropdown, and the cluster
attaches to it.  Repeat for the other variants.  No need to pre-create
the canonical project — though pre-creation still works fine if you'd
rather.

Backend (backend/routers/metadata_backfill.py):
- GET /api/admin/metadata_backfill/projects_search?q=&limit=
  Returns existing projects matching by case-insensitive substring OR
  rapidfuzz WRatio score >= 0.50.  Substring matches sort to the top
  (treated as exact for ordering).  Includes location_count and
  project_number/client_name in each result for disambiguation.  Always
  emits a "Create new: <q>" suggestion alongside the matches.

- GET /api/admin/metadata_backfill/locations_search?project_id=&q=&limit=
  Same shape, scoped to a single project's vibration locations.

- POST /api/admin/metadata_backfill/apply now accepts four override
  keys per cluster (was previously two):
    project_id       → attach to existing Project (operator picked from
                       typeahead)
    project_name     → create new with this name (operator typed a
                       custom name; existing project_name behaviour)
    location_id      → attach to existing MonitoringLocation; validated
                       against the chosen project_id so a stale location
                       FK can't sneak in
    location_name    → create new location with this name

Frontend (templates/admin/metadata_backfill.html):
- Each non-blank-meta cluster card now has two editable typeahead inputs
  (Project + Location) pre-populated with the parser's suggested
  values.  Old static "Project: + Create new: X" / "≈ Fuzzy match" pills
  replaced with compact hint lines under the inputs showing what the
  current value will do.
- Typeahead dropdown opens on focus, debounced 150ms on type.  Shows
  matched existing entities with score badges (exact / NN%) plus a
  "Create new: <typed>" option at the bottom.  Click-to-pick fills the
  text input and writes the entity id into a hidden field.
- Picking a new project clears the location id (forces re-pick under
  the new project, avoids cross-project location FKs).
- _gatherOverrides re-wired to emit the new project_id / location_id
  keys when the operator picked from the dropdown, falling back to
  *_name when they typed free-form.

Backward-compatible: blank-meta clusters keep their existing "project_name
/ location_name" plain inputs and the override path still honours them.

Verified end-to-end:
- /projects_search?q=I-80 returns the existing "I-80 - North Fork
  Bridge" project (score 1.0, has 4 locations) plus a "Create new"
  option.
- /locations_search requires project_id (400 without it).
- Wizard page renders with typeahead wiring confirmed in HTML.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-12 19:48:09 +00:00

395 lines
14 KiB
Python

"""
Metadata-backfill admin router.
Endpoints under /api/admin/metadata_backfill:
GET /scan — run the scan; return clusters + suggestions (JSON).
Cached 5 minutes so the wizard doesn't re-scan on
every page render.
POST /apply — apply a list of cluster_ids; body specifies which to
accept and optional per-cluster overrides.
POST /skip — mark cluster_ids as skipped (won't reappear).
"""
from __future__ import annotations
import os
import time
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import Project, MonitoringLocation
from backend.services import metadata_backfill as svc
router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"])
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# In-process scan cache. Trades memory for not re-hammering SFM on every
# wizard render. TTL: 5 minutes. Singleton per-process; fine for a
# single-worker uvicorn dev setup. For prod multi-worker we'd want to put
# this in the DB or Redis; deferred.
_SCAN_CACHE: dict = {"at": 0.0, "result": None}
_SCAN_CACHE_TTL_SECONDS = 300.0
def _serialise_suggestion(s: svc.Suggestion) -> dict:
c = s.cluster
return {
"cluster_id": c.cluster_id,
"serial": c.serial,
"first_event_ts": c.first_event_ts.isoformat(),
"last_event_ts": c.last_event_ts.isoformat(),
"event_count": c.event_count,
"sample_event_id": c.sample_event_id,
"project_raw": c.project_raw,
"project_root": c.project_root,
"location_raw": c.location_raw,
"client_raw": c.client_raw,
"operator_raw": c.operator_raw,
"is_blank_meta": c.is_blank_meta,
"metadata_consistency": c.metadata_consistency,
"project_match": s.project_match,
"project_existing_id": s.project_existing_id,
"project_existing_name": s.project_existing_name,
"project_match_score": s.project_match_score,
"project_suggested_name": s.project_suggested_name,
"location_match": s.location_match,
"location_existing_id": s.location_existing_id,
"location_existing_name": s.location_existing_name,
"location_match_score": s.location_match_score,
"location_suggested_name": s.location_suggested_name,
"proposed_assigned_at": s.proposed_assigned_at.isoformat(),
"proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None,
"confidence": s.confidence,
"blocking_conflict": s.blocking_conflict,
"conflicts": [
{
"existing_assignment_id": cf.existing_assignment_id,
"other_location_id": cf.other_location_id,
"other_location_name": cf.other_location_name,
"other_project_id": cf.other_project_id,
"other_project_name": cf.other_project_name,
}
for cf in s.conflicts
],
}
@router.get("/scan")
async def scan(
force: bool = False,
db: Session = Depends(get_db),
):
"""Run a scan and return clusters + suggestions.
Set force=true to bypass the 5-minute cache.
"""
now = time.time()
if not force and _SCAN_CACHE["result"] is not None \
and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS:
return _SCAN_CACHE["result"]
result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
# Group suggestions for the wizard UI.
by_confidence = {"high": [], "medium": [], "low": []}
blocking_conflict_count = 0
for s in result.suggestions:
by_confidence[s.confidence].append(_serialise_suggestion(s))
if s.blocking_conflict:
blocking_conflict_count += 1
payload = {
"scanned_event_count": result.scanned_event_count,
"cluster_count": result.cluster_count,
"already_attributed": result.already_attributed,
"skipped_orphans": result.skipped_orphans,
"pending_count": len(result.suggestions),
"blocking_conflict_count": blocking_conflict_count,
"by_confidence": {
"high": by_confidence["high"],
"medium": by_confidence["medium"],
"low": by_confidence["low"],
},
"scanned_at": now,
}
_SCAN_CACHE["result"] = payload
_SCAN_CACHE["at"] = now
return payload
@router.post("/apply")
async def apply(
request: Request,
db: Session = Depends(get_db),
):
"""Apply a list of clusters.
Body:
{
"cluster_ids": ["abc...", "def..."],
"overrides": { "abc...": { "project_name": "...", "location_name": "..." } }
}
To accept ALL non-conflict suggestions in one shot, the UI sends every
pending cluster_id with no overrides.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
cluster_ids = body.get("cluster_ids") or []
overrides = body.get("overrides") or {}
if not isinstance(cluster_ids, list) or not cluster_ids:
raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list")
# Re-scan to get current suggestions. We don't trust the cached scan
# blindly — the operator might have manually created projects in
# between scan and apply.
scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions}
selected: list[svc.Suggestion] = []
not_found: list[str] = []
for cid in cluster_ids:
s = suggestions_by_id.get(cid)
if s is None:
not_found.append(cid)
continue
# Apply overrides. Per-cluster overrides take precedence over the
# parser's suggested match. Four override fields supported:
# project_id — attach to an existing Project (operator picked
# from the typeahead)
# project_name — create new project with this name (operator
# typed a custom name not matching anything)
# location_id — attach to an existing MonitoringLocation
# location_name — create new location with this name
# project_id + location_id pairings: location_id is only honored
# if its project_id matches the chosen project (otherwise treated
# as a create-new).
ov = overrides.get(cid) or {}
if ov.get("project_id"):
target_id = ov["project_id"]
existing = db.query(svc.Project).filter_by(id=target_id).first()
if existing is not None:
s.project_existing_id = existing.id
s.project_existing_name = existing.name
s.project_suggested_name = existing.name
s.project_match = "exact"
else:
# Stale ID — treat as create_new with the cluster's typed name.
s.project_existing_id = None
s.project_match = "create_new"
elif "project_name" in ov:
new_name = (ov["project_name"] or "").strip()
if new_name:
s.project_suggested_name = new_name
s.project_existing_id = None
s.project_existing_name = None
s.project_match = "create_new"
if ov.get("location_id"):
target_id = ov["location_id"]
existing = db.query(svc.MonitoringLocation).filter_by(id=target_id).first()
# Only attach if the location belongs to the (now chosen) project.
chosen_project_id = s.project_existing_id
if existing is not None and (
chosen_project_id is None or existing.project_id == chosen_project_id
):
s.location_existing_id = existing.id
s.location_existing_name = existing.name
s.location_suggested_name = existing.name
s.location_match = "exact"
else:
s.location_existing_id = None
s.location_match = "create_new"
elif "location_name" in ov:
new_name = (ov["location_name"] or "").strip()
if new_name:
s.location_suggested_name = new_name
s.location_existing_id = None
s.location_existing_name = None
s.location_match = "create_new"
selected.append(s)
apply_result = svc.apply_suggestions(db, selected, decided_by="operator")
# Invalidate the scan cache so the next /scan picks up the new state.
_SCAN_CACHE["at"] = 0.0
_SCAN_CACHE["result"] = None
return {
"applied": apply_result.applied,
"failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed],
"not_found": not_found,
"project_ids_created": apply_result.project_ids_created,
"location_ids_created": apply_result.location_ids_created,
"assignment_ids_created": apply_result.assignment_ids_created,
}
@router.post("/skip")
async def skip(
request: Request,
db: Session = Depends(get_db),
):
"""Mark cluster_ids as skipped — they won't reappear in future scans."""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
cluster_ids = body.get("cluster_ids") or []
if not isinstance(cluster_ids, list):
raise HTTPException(status_code=400, detail="cluster_ids must be a list")
n = svc.skip_clusters(db, cluster_ids, decided_by="operator")
_SCAN_CACHE["at"] = 0.0
_SCAN_CACHE["result"] = None
return {"skipped": n}
@router.get("/projects_search")
def projects_search(
q: str = "",
limit: int = 10,
db: Session = Depends(get_db),
):
"""Typeahead search of existing projects for the wizard's per-cluster
override inputs. Combines case-insensitive substring match with
rapidfuzz scoring so partial typing and slight typos both surface
candidates. Always returns a 'Create new' option at the end so the
operator can confirm they want to create rather than match.
Returns:
{
"matches": [
{"id": "...", "name": "...", "score": 0.91, "location_count": 3},
...
],
"create_new": {"label": "Create new: \"<q>\""}
}
"""
q_clean = (q or "").strip()
q_norm = svc._normalise(q_clean)
projects = (
db.query(Project)
.filter(Project.status != "deleted")
.all()
)
scored: list[tuple[Project, float]] = []
for p in projects:
p_norm = svc._normalise(p.name)
if not q_norm:
# Empty query → return top projects by latest activity
# (cheap heuristic: keep them all and sort by name).
scored.append((p, 0.0))
continue
# Cheap substring boost: if the normalised query is a substring,
# treat that as 1.0 regardless of WRatio.
if q_norm in p_norm:
scored.append((p, 1.0))
continue
score = svc.similarity(q_norm, p_norm)
if score >= 0.50: # surfacing threshold; not the match threshold
scored.append((p, score))
# Sort: score desc, then name asc.
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
scored = scored[:limit]
# Compute location counts in one batch query.
loc_counts: dict[str, int] = {}
if scored:
from sqlalchemy import func
ids = [p.id for p, _ in scored]
rows = (
db.query(MonitoringLocation.project_id, func.count(MonitoringLocation.id))
.filter(MonitoringLocation.project_id.in_(ids))
.group_by(MonitoringLocation.project_id)
.all()
)
loc_counts = {pid: cnt for pid, cnt in rows}
return {
"matches": [
{
"id": p.id,
"name": p.name,
"project_number": p.project_number,
"client_name": p.client_name,
"score": round(score, 3),
"location_count": loc_counts.get(p.id, 0),
}
for p, score in scored
],
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
}
@router.get("/locations_search")
def locations_search(
project_id: str,
q: str = "",
limit: int = 10,
db: Session = Depends(get_db),
):
"""Typeahead search of existing locations within a project."""
if not project_id:
raise HTTPException(status_code=400, detail="project_id required")
q_clean = (q or "").strip()
q_norm = svc._normalise(q_clean)
locations = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project_id)
.filter(MonitoringLocation.location_type == "vibration")
.all()
)
scored: list[tuple[MonitoringLocation, float]] = []
for l in locations:
l_norm = svc._normalise(l.name)
if not q_norm:
scored.append((l, 0.0))
continue
if q_norm in l_norm:
scored.append((l, 1.0))
continue
score = svc.similarity(q_norm, l_norm)
if score >= 0.50:
scored.append((l, score))
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
scored = scored[:limit]
return {
"matches": [
{
"id": l.id,
"name": l.name,
"address": l.address,
"score": round(score, 3),
}
for l, score in scored
],
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
}