6ebbe28308
Operators sometimes bake location identifiers into the project string for email-readability — "Fay - Locks & Dam No3 - Loc 2 - 735 Bunola" where "Fay - Locks & Dam No3" is the actual project and "- Loc 2 - 735 Bunola" is location info that already lives in sensor_location. Without stripping, every "- Loc N" variant became a separate project, fragmenting what should be one project with several locations. Backend: - New _extract_project_root() helper. Regex matches " - Loc N" / "-Loc3" / " - Location #5" / etc. with case-insensitive multi-dash support; strips from that marker forward and cleans up dangling separators. Strings without a Loc-marker pass through unchanged. - Cluster dataclass adds project_root field alongside project_raw. project_raw stays the operator-typed string for display ("hover to see what was actually typed"). project_root is what gets normalised for matching and used as the suggested project name. - _ensure_project + _ensure_location now do normalisation-aware dedup before creating: a cluster of "SR81" and a cluster of "SR 81" (which normalise to the same string) collapse into one project on apply, even when applied in the same bulk operation. Avoids UNIQUE constraint collisions and duplicate-named-by-spacing projects. Frontend: - Wizard cluster cards show "↳ stripped trailing 'Loc N' suffix; operator typed: <raw>" when project_root differs from project_raw, so the operator can see at a glance what the parser did to the string. Real-data results: against the same 10,055 SFM events, confidence distribution improved from 37/14/8 (high/med/low) to 43/9/7. "Fay - Locks & Dam No3" now appears as ONE project across 6 cluster instances spanning 3 serials and 6 different locations — exactly the "one project, many locations" model the user described. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
228 lines
8.4 KiB
Python
228 lines
8.4 KiB
Python
"""
|
|
Metadata-backfill admin router.
|
|
|
|
Endpoints under /api/admin/metadata_backfill:
|
|
|
|
GET /scan — run the scan; return clusters + suggestions (JSON).
|
|
Cached 5 minutes so the wizard doesn't re-scan on
|
|
every page render.
|
|
POST /apply — apply a list of cluster_ids; body specifies which to
|
|
accept and optional per-cluster overrides.
|
|
POST /skip — mark cluster_ids as skipped (won't reappear).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.database import get_db
|
|
from backend.services import metadata_backfill as svc
|
|
|
|
router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"])
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
# In-process scan cache. Trades memory for not re-hammering SFM on every
|
|
# wizard render. TTL: 5 minutes. Singleton per-process; fine for a
|
|
# single-worker uvicorn dev setup. For prod multi-worker we'd want to put
|
|
# this in the DB or Redis; deferred.
|
|
_SCAN_CACHE: dict = {"at": 0.0, "result": None}
|
|
_SCAN_CACHE_TTL_SECONDS = 300.0
|
|
|
|
|
|
def _serialise_suggestion(s: svc.Suggestion) -> dict:
|
|
c = s.cluster
|
|
return {
|
|
"cluster_id": c.cluster_id,
|
|
"serial": c.serial,
|
|
"first_event_ts": c.first_event_ts.isoformat(),
|
|
"last_event_ts": c.last_event_ts.isoformat(),
|
|
"event_count": c.event_count,
|
|
"sample_event_id": c.sample_event_id,
|
|
"project_raw": c.project_raw,
|
|
"project_root": c.project_root,
|
|
"location_raw": c.location_raw,
|
|
"client_raw": c.client_raw,
|
|
"operator_raw": c.operator_raw,
|
|
"is_blank_meta": c.is_blank_meta,
|
|
"metadata_consistency": c.metadata_consistency,
|
|
|
|
"project_match": s.project_match,
|
|
"project_existing_id": s.project_existing_id,
|
|
"project_existing_name": s.project_existing_name,
|
|
"project_match_score": s.project_match_score,
|
|
"project_suggested_name": s.project_suggested_name,
|
|
|
|
"location_match": s.location_match,
|
|
"location_existing_id": s.location_existing_id,
|
|
"location_existing_name": s.location_existing_name,
|
|
"location_match_score": s.location_match_score,
|
|
"location_suggested_name": s.location_suggested_name,
|
|
|
|
"proposed_assigned_at": s.proposed_assigned_at.isoformat(),
|
|
"proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None,
|
|
|
|
"confidence": s.confidence,
|
|
"blocking_conflict": s.blocking_conflict,
|
|
"conflicts": [
|
|
{
|
|
"existing_assignment_id": cf.existing_assignment_id,
|
|
"other_location_id": cf.other_location_id,
|
|
"other_location_name": cf.other_location_name,
|
|
"other_project_id": cf.other_project_id,
|
|
"other_project_name": cf.other_project_name,
|
|
}
|
|
for cf in s.conflicts
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/scan")
|
|
async def scan(
|
|
force: bool = False,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Run a scan and return clusters + suggestions.
|
|
|
|
Set force=true to bypass the 5-minute cache.
|
|
"""
|
|
now = time.time()
|
|
if not force and _SCAN_CACHE["result"] is not None \
|
|
and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS:
|
|
return _SCAN_CACHE["result"]
|
|
|
|
result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
|
|
|
# Group suggestions for the wizard UI.
|
|
by_confidence = {"high": [], "medium": [], "low": []}
|
|
blocking_conflict_count = 0
|
|
for s in result.suggestions:
|
|
by_confidence[s.confidence].append(_serialise_suggestion(s))
|
|
if s.blocking_conflict:
|
|
blocking_conflict_count += 1
|
|
|
|
payload = {
|
|
"scanned_event_count": result.scanned_event_count,
|
|
"cluster_count": result.cluster_count,
|
|
"already_attributed": result.already_attributed,
|
|
"skipped_orphans": result.skipped_orphans,
|
|
"pending_count": len(result.suggestions),
|
|
"blocking_conflict_count": blocking_conflict_count,
|
|
"by_confidence": {
|
|
"high": by_confidence["high"],
|
|
"medium": by_confidence["medium"],
|
|
"low": by_confidence["low"],
|
|
},
|
|
"scanned_at": now,
|
|
}
|
|
_SCAN_CACHE["result"] = payload
|
|
_SCAN_CACHE["at"] = now
|
|
return payload
|
|
|
|
|
|
@router.post("/apply")
|
|
async def apply(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Apply a list of clusters.
|
|
|
|
Body:
|
|
{
|
|
"cluster_ids": ["abc...", "def..."],
|
|
"overrides": { "abc...": { "project_name": "...", "location_name": "..." } }
|
|
}
|
|
|
|
To accept ALL non-conflict suggestions in one shot, the UI sends every
|
|
pending cluster_id with no overrides.
|
|
"""
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
|
|
|
cluster_ids = body.get("cluster_ids") or []
|
|
overrides = body.get("overrides") or {}
|
|
if not isinstance(cluster_ids, list) or not cluster_ids:
|
|
raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list")
|
|
|
|
# Re-scan to get current suggestions. We don't trust the cached scan
|
|
# blindly — the operator might have manually created projects in
|
|
# between scan and apply.
|
|
scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
|
suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions}
|
|
|
|
selected: list[svc.Suggestion] = []
|
|
not_found: list[str] = []
|
|
for cid in cluster_ids:
|
|
s = suggestions_by_id.get(cid)
|
|
if s is None:
|
|
not_found.append(cid)
|
|
continue
|
|
# Apply overrides.
|
|
ov = overrides.get(cid) or {}
|
|
if "project_name" in ov:
|
|
s.project_suggested_name = (ov["project_name"] or "").strip() or s.project_suggested_name
|
|
# Override implies operator wants to create new (or rename).
|
|
# If they wanted an exact match, they'd not have overridden.
|
|
if s.project_match in ("create_new",):
|
|
pass # keep create_new
|
|
else:
|
|
# Operator typed a custom name — force create-new behaviour
|
|
# so we don't accidentally attach to a different existing
|
|
# project by exact-match.
|
|
s.project_existing_id = None
|
|
s.project_match = "create_new"
|
|
if "location_name" in ov:
|
|
s.location_suggested_name = (ov["location_name"] or "").strip() or s.location_suggested_name
|
|
if s.location_match in ("create_new",):
|
|
pass
|
|
else:
|
|
s.location_existing_id = None
|
|
s.location_match = "create_new"
|
|
selected.append(s)
|
|
|
|
apply_result = svc.apply_suggestions(db, selected, decided_by="operator")
|
|
|
|
# Invalidate the scan cache so the next /scan picks up the new state.
|
|
_SCAN_CACHE["at"] = 0.0
|
|
_SCAN_CACHE["result"] = None
|
|
|
|
return {
|
|
"applied": apply_result.applied,
|
|
"failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed],
|
|
"not_found": not_found,
|
|
"project_ids_created": apply_result.project_ids_created,
|
|
"location_ids_created": apply_result.location_ids_created,
|
|
"assignment_ids_created": apply_result.assignment_ids_created,
|
|
}
|
|
|
|
|
|
@router.post("/skip")
|
|
async def skip(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Mark cluster_ids as skipped — they won't reappear in future scans."""
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
|
|
|
cluster_ids = body.get("cluster_ids") or []
|
|
if not isinstance(cluster_ids, list):
|
|
raise HTTPException(status_code=400, detail="cluster_ids must be a list")
|
|
|
|
n = svc.skip_clusters(db, cluster_ids, decided_by="operator")
|
|
|
|
_SCAN_CACHE["at"] = 0.0
|
|
_SCAN_CACHE["result"] = None
|
|
|
|
return {"skipped": n}
|