""" Metadata-backfill admin router. Endpoints under /api/admin/metadata_backfill: GET /scan — run the scan; return clusters + suggestions (JSON). Cached 5 minutes so the wizard doesn't re-scan on every page render. POST /apply — apply a list of cluster_ids; body specifies which to accept and optional per-cluster overrides. POST /skip — mark cluster_ids as skipped (won't reappear). """ from __future__ import annotations import os import time from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from backend.database import get_db from backend.models import Project, MonitoringLocation from backend.services import metadata_backfill as svc router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"]) SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") # In-process scan cache. Trades memory for not re-hammering SFM on every # wizard render. TTL: 5 minutes. Singleton per-process; fine for a # single-worker uvicorn dev setup. For prod multi-worker we'd want to put # this in the DB or Redis; deferred. _SCAN_CACHE: dict = {"at": 0.0, "result": None} _SCAN_CACHE_TTL_SECONDS = 300.0 def _serialise_suggestion(s: svc.Suggestion) -> dict: c = s.cluster return { "cluster_id": c.cluster_id, "serial": c.serial, "first_event_ts": c.first_event_ts.isoformat(), "last_event_ts": c.last_event_ts.isoformat(), "event_count": c.event_count, "sample_event_id": c.sample_event_id, "project_raw": c.project_raw, "project_root": c.project_root, "location_raw": c.location_raw, "client_raw": c.client_raw, "operator_raw": c.operator_raw, "is_blank_meta": c.is_blank_meta, "metadata_consistency": c.metadata_consistency, "project_match": s.project_match, "project_existing_id": s.project_existing_id, "project_existing_name": s.project_existing_name, "project_match_score": s.project_match_score, "project_suggested_name": s.project_suggested_name, "location_match": s.location_match, "location_existing_id": s.location_existing_id, "location_existing_name": s.location_existing_name, "location_match_score": s.location_match_score, "location_suggested_name": s.location_suggested_name, "proposed_assigned_at": s.proposed_assigned_at.isoformat(), "proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None, "confidence": s.confidence, "blocking_conflict": s.blocking_conflict, "conflicts": [ { "existing_assignment_id": cf.existing_assignment_id, "other_location_id": cf.other_location_id, "other_location_name": cf.other_location_name, "other_project_id": cf.other_project_id, "other_project_name": cf.other_project_name, } for cf in s.conflicts ], } @router.get("/scan") async def scan( force: bool = False, db: Session = Depends(get_db), ): """Run a scan and return clusters + suggestions. Set force=true to bypass the 5-minute cache. """ now = time.time() if not force and _SCAN_CACHE["result"] is not None \ and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS: return _SCAN_CACHE["result"] result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL) # Group suggestions for the wizard UI. by_confidence = {"high": [], "medium": [], "low": []} blocking_conflict_count = 0 for s in result.suggestions: by_confidence[s.confidence].append(_serialise_suggestion(s)) if s.blocking_conflict: blocking_conflict_count += 1 payload = { "scanned_event_count": result.scanned_event_count, "cluster_count": result.cluster_count, "already_attributed": result.already_attributed, "skipped_orphans": result.skipped_orphans, "pending_count": len(result.suggestions), "blocking_conflict_count": blocking_conflict_count, "by_confidence": { "high": by_confidence["high"], "medium": by_confidence["medium"], "low": by_confidence["low"], }, "scanned_at": now, } _SCAN_CACHE["result"] = payload _SCAN_CACHE["at"] = now return payload @router.post("/apply") async def apply( request: Request, db: Session = Depends(get_db), ): """Apply a list of clusters. Body: { "cluster_ids": ["abc...", "def..."], "overrides": { "abc...": { "project_name": "...", "location_name": "..." } } } To accept ALL non-conflict suggestions in one shot, the UI sends every pending cluster_id with no overrides. """ try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") cluster_ids = body.get("cluster_ids") or [] overrides = body.get("overrides") or {} if not isinstance(cluster_ids, list) or not cluster_ids: raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list") # Re-scan to get current suggestions. We don't trust the cached scan # blindly — the operator might have manually created projects in # between scan and apply. scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL) suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions} selected: list[svc.Suggestion] = [] not_found: list[str] = [] for cid in cluster_ids: s = suggestions_by_id.get(cid) if s is None: not_found.append(cid) continue # Apply overrides. Per-cluster overrides take precedence over the # parser's suggested match. Four override fields supported: # project_id — attach to an existing Project (operator picked # from the typeahead) # project_name — create new project with this name (operator # typed a custom name not matching anything) # location_id — attach to an existing MonitoringLocation # location_name — create new location with this name # project_id + location_id pairings: location_id is only honored # if its project_id matches the chosen project (otherwise treated # as a create-new). ov = overrides.get(cid) or {} if ov.get("project_id"): target_id = ov["project_id"] existing = db.query(svc.Project).filter_by(id=target_id).first() if existing is not None: s.project_existing_id = existing.id s.project_existing_name = existing.name s.project_suggested_name = existing.name s.project_match = "exact" else: # Stale ID — treat as create_new with the cluster's typed name. s.project_existing_id = None s.project_match = "create_new" elif "project_name" in ov: new_name = (ov["project_name"] or "").strip() if new_name: s.project_suggested_name = new_name s.project_existing_id = None s.project_existing_name = None s.project_match = "create_new" if ov.get("location_id"): target_id = ov["location_id"] existing = db.query(svc.MonitoringLocation).filter_by(id=target_id).first() # Only attach if the location belongs to the (now chosen) project. chosen_project_id = s.project_existing_id if existing is not None and ( chosen_project_id is None or existing.project_id == chosen_project_id ): s.location_existing_id = existing.id s.location_existing_name = existing.name s.location_suggested_name = existing.name s.location_match = "exact" else: s.location_existing_id = None s.location_match = "create_new" elif "location_name" in ov: new_name = (ov["location_name"] or "").strip() if new_name: s.location_suggested_name = new_name s.location_existing_id = None s.location_existing_name = None s.location_match = "create_new" selected.append(s) apply_result = svc.apply_suggestions(db, selected, decided_by="operator") # Invalidate the scan cache so the next /scan picks up the new state. _SCAN_CACHE["at"] = 0.0 _SCAN_CACHE["result"] = None return { "applied": apply_result.applied, "failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed], "not_found": not_found, "project_ids_created": apply_result.project_ids_created, "location_ids_created": apply_result.location_ids_created, "assignment_ids_created": apply_result.assignment_ids_created, } @router.post("/skip") async def skip( request: Request, db: Session = Depends(get_db), ): """Mark cluster_ids as skipped — they won't reappear in future scans.""" try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") cluster_ids = body.get("cluster_ids") or [] if not isinstance(cluster_ids, list): raise HTTPException(status_code=400, detail="cluster_ids must be a list") n = svc.skip_clusters(db, cluster_ids, decided_by="operator") _SCAN_CACHE["at"] = 0.0 _SCAN_CACHE["result"] = None return {"skipped": n} @router.get("/projects_search") def projects_search( q: str = "", limit: int = 10, db: Session = Depends(get_db), ): """Typeahead search of existing projects for the wizard's per-cluster override inputs. Combines case-insensitive substring match with rapidfuzz scoring so partial typing and slight typos both surface candidates. Always returns a 'Create new' option at the end so the operator can confirm they want to create rather than match. Returns: { "matches": [ {"id": "...", "name": "...", "score": 0.91, "location_count": 3}, ... ], "create_new": {"label": "Create new: \"\""} } """ q_clean = (q or "").strip() q_norm = svc._normalise(q_clean) projects = ( db.query(Project) .filter(Project.status != "deleted") .all() ) scored: list[tuple[Project, float]] = [] for p in projects: p_norm = svc._normalise(p.name) if not q_norm: # Empty query → return top projects by latest activity # (cheap heuristic: keep them all and sort by name). scored.append((p, 0.0)) continue # Cheap substring boost: if the normalised query is a substring, # treat that as 1.0 regardless of WRatio. if q_norm in p_norm: scored.append((p, 1.0)) continue score = svc.similarity(q_norm, p_norm) if score >= 0.50: # surfacing threshold; not the match threshold scored.append((p, score)) # Sort: score desc, then name asc. scored.sort(key=lambda t: (-t[1], t[0].name.lower())) scored = scored[:limit] # Compute location counts in one batch query. loc_counts: dict[str, int] = {} if scored: from sqlalchemy import func ids = [p.id for p, _ in scored] rows = ( db.query(MonitoringLocation.project_id, func.count(MonitoringLocation.id)) .filter(MonitoringLocation.project_id.in_(ids)) .group_by(MonitoringLocation.project_id) .all() ) loc_counts = {pid: cnt for pid, cnt in rows} return { "matches": [ { "id": p.id, "name": p.name, "project_number": p.project_number, "client_name": p.client_name, "score": round(score, 3), "location_count": loc_counts.get(p.id, 0), } for p, score in scored ], "create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None}, } @router.get("/locations_search") def locations_search( project_id: str, q: str = "", limit: int = 10, db: Session = Depends(get_db), ): """Typeahead search of existing locations within a project.""" if not project_id: raise HTTPException(status_code=400, detail="project_id required") q_clean = (q or "").strip() q_norm = svc._normalise(q_clean) locations = ( db.query(MonitoringLocation) .filter(MonitoringLocation.project_id == project_id) .filter(MonitoringLocation.location_type == "vibration") .all() ) scored: list[tuple[MonitoringLocation, float]] = [] for l in locations: l_norm = svc._normalise(l.name) if not q_norm: scored.append((l, 0.0)) continue if q_norm in l_norm: scored.append((l, 1.0)) continue score = svc.similarity(q_norm, l_norm) if score >= 0.50: scored.append((l, score)) scored.sort(key=lambda t: (-t[1], t[0].name.lower())) scored = scored[:limit] return { "matches": [ { "id": l.id, "name": l.name, "address": l.address, "score": round(score, 3), } for l, score in scored ], "create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None}, }