d5a0163852
When a client drops a location from scope mid-project (e.g. the office
half of a museum+office monitoring job), operators couldn't previously
mark it as no-longer-active without either deleting it (which would
orphan historical events) or leaving it in the active list looking
deployable. Now there's a proper middle ground.
Data model
- MonitoringLocation gets two new nullable columns:
- removed_at — NULL means active; set means soft-removed
- removal_reason — optional operator note
Migration: backend/migrate_add_location_removed.py (idempotent)
Endpoints
- POST /api/projects/{p}/locations/{l}/remove
Body: { effective_date?: ISO-datetime, reason?: str }
Side effects (cascade):
1. Closes active UnitAssignment rows at this location
(assigned_until = effective_date, status = "completed")
2. Cancels pending ScheduledActions at this location
3. Marks location.removed_at = effective_date
Returns counts of assignments closed + actions cancelled.
- POST /api/projects/{p}/locations/{l}/restore
Clears removed_at + removal_reason. Does NOT auto-reopen
assignments — operator creates new ones if resuming monitoring.
Active-surface filters
- locations-json defaults to active-only; pass include_removed=true
for historical / reporting views. Schedule modal dropdowns now
exclude removed locations automatically.
- Metadata-backfill fuzzy matcher excludes removed locations from
proposed targets (don't want backfill creating new assignments at
decommissioned locations).
- Vibration-summary per_location rollup includes removed locations
(so historical event totals stay accurate) but tags each with
removed_at so the UI can show a badge.
UI
- Project detail page's Monitoring Locations section now splits into:
Active locations (full card with Assign / Edit / Remove / Delete)
Removed locations (collapsed <details>, greyed cards, Restore button,
shows removal date + reason)
- New per-card "Remove" button → opens confirmation modal explaining
the cascade, with optional effective-date (defaults to now,
backdateable) and reason fields.
- Unit detail's SFM Events attribution cell shows a small "removed"
badge next to historical attributions whose location is no longer
active. Same pattern in vibration_summary's top-locations list.
- Soft-removal indicator surfaced through the events_for_unit
attribution payload as location_removed_at.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""
|
|
Metadata-backfill admin router.
|
|
|
|
Endpoints under /api/admin/metadata_backfill:
|
|
|
|
GET /scan — run the scan; return clusters + suggestions (JSON).
|
|
Cached 5 minutes so the wizard doesn't re-scan on
|
|
every page render.
|
|
POST /apply — apply a list of cluster_ids; body specifies which to
|
|
accept and optional per-cluster overrides.
|
|
POST /skip — mark cluster_ids as skipped (won't reappear).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.database import get_db
|
|
from backend.models import Project, MonitoringLocation
|
|
from backend.services import metadata_backfill as svc
|
|
|
|
router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"])
|
|
|
|
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
|
|
|
# In-process scan cache. Trades memory for not re-hammering SFM on every
|
|
# wizard render. TTL: 5 minutes. Singleton per-process; fine for a
|
|
# single-worker uvicorn dev setup. For prod multi-worker we'd want to put
|
|
# this in the DB or Redis; deferred.
|
|
_SCAN_CACHE: dict = {"at": 0.0, "result": None}
|
|
_SCAN_CACHE_TTL_SECONDS = 300.0
|
|
|
|
|
|
def _serialise_suggestion(s: svc.Suggestion) -> dict:
|
|
c = s.cluster
|
|
return {
|
|
"cluster_id": c.cluster_id,
|
|
"serial": c.serial,
|
|
"first_event_ts": c.first_event_ts.isoformat(),
|
|
"last_event_ts": c.last_event_ts.isoformat(),
|
|
"event_count": c.event_count,
|
|
"sample_event_id": c.sample_event_id,
|
|
"project_raw": c.project_raw,
|
|
"project_root": c.project_root,
|
|
"location_raw": c.location_raw,
|
|
"client_raw": c.client_raw,
|
|
"operator_raw": c.operator_raw,
|
|
"is_blank_meta": c.is_blank_meta,
|
|
"metadata_consistency": c.metadata_consistency,
|
|
|
|
"project_match": s.project_match,
|
|
"project_existing_id": s.project_existing_id,
|
|
"project_existing_name": s.project_existing_name,
|
|
"project_match_score": s.project_match_score,
|
|
"project_suggested_name": s.project_suggested_name,
|
|
|
|
"location_match": s.location_match,
|
|
"location_existing_id": s.location_existing_id,
|
|
"location_existing_name": s.location_existing_name,
|
|
"location_match_score": s.location_match_score,
|
|
"location_suggested_name": s.location_suggested_name,
|
|
|
|
"proposed_assigned_at": s.proposed_assigned_at.isoformat(),
|
|
"proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None,
|
|
|
|
"confidence": s.confidence,
|
|
"blocking_conflict": s.blocking_conflict,
|
|
"conflicts": [
|
|
{
|
|
"existing_assignment_id": cf.existing_assignment_id,
|
|
"other_location_id": cf.other_location_id,
|
|
"other_location_name": cf.other_location_name,
|
|
"other_project_id": cf.other_project_id,
|
|
"other_project_name": cf.other_project_name,
|
|
}
|
|
for cf in s.conflicts
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/scan")
|
|
async def scan(
|
|
force: bool = False,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Run a scan and return clusters + suggestions.
|
|
|
|
Set force=true to bypass the 5-minute cache.
|
|
"""
|
|
now = time.time()
|
|
if not force and _SCAN_CACHE["result"] is not None \
|
|
and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS:
|
|
return _SCAN_CACHE["result"]
|
|
|
|
result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
|
|
|
# Group suggestions for the wizard UI.
|
|
by_confidence = {"high": [], "medium": [], "low": []}
|
|
blocking_conflict_count = 0
|
|
for s in result.suggestions:
|
|
by_confidence[s.confidence].append(_serialise_suggestion(s))
|
|
if s.blocking_conflict:
|
|
blocking_conflict_count += 1
|
|
|
|
payload = {
|
|
"scanned_event_count": result.scanned_event_count,
|
|
"cluster_count": result.cluster_count,
|
|
"already_attributed": result.already_attributed,
|
|
"skipped_orphans": result.skipped_orphans,
|
|
"pending_count": len(result.suggestions),
|
|
"blocking_conflict_count": blocking_conflict_count,
|
|
"by_confidence": {
|
|
"high": by_confidence["high"],
|
|
"medium": by_confidence["medium"],
|
|
"low": by_confidence["low"],
|
|
},
|
|
"scanned_at": now,
|
|
}
|
|
_SCAN_CACHE["result"] = payload
|
|
_SCAN_CACHE["at"] = now
|
|
return payload
|
|
|
|
|
|
@router.post("/apply")
|
|
async def apply(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Apply a list of clusters.
|
|
|
|
Body:
|
|
{
|
|
"cluster_ids": ["abc...", "def..."],
|
|
"overrides": { "abc...": { "project_name": "...", "location_name": "..." } }
|
|
}
|
|
|
|
To accept ALL non-conflict suggestions in one shot, the UI sends every
|
|
pending cluster_id with no overrides.
|
|
"""
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
|
|
|
cluster_ids = body.get("cluster_ids") or []
|
|
overrides = body.get("overrides") or {}
|
|
if not isinstance(cluster_ids, list) or not cluster_ids:
|
|
raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list")
|
|
|
|
# Re-scan to get current suggestions. We don't trust the cached scan
|
|
# blindly — the operator might have manually created projects in
|
|
# between scan and apply.
|
|
scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
|
suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions}
|
|
|
|
selected: list[svc.Suggestion] = []
|
|
not_found: list[str] = []
|
|
for cid in cluster_ids:
|
|
s = suggestions_by_id.get(cid)
|
|
if s is None:
|
|
not_found.append(cid)
|
|
continue
|
|
# Apply overrides. Per-cluster overrides take precedence over the
|
|
# parser's suggested match. Four override fields supported:
|
|
# project_id — attach to an existing Project (operator picked
|
|
# from the typeahead)
|
|
# project_name — create new project with this name (operator
|
|
# typed a custom name not matching anything)
|
|
# location_id — attach to an existing MonitoringLocation
|
|
# location_name — create new location with this name
|
|
# project_id + location_id pairings: location_id is only honored
|
|
# if its project_id matches the chosen project (otherwise treated
|
|
# as a create-new).
|
|
ov = overrides.get(cid) or {}
|
|
|
|
if ov.get("project_id"):
|
|
target_id = ov["project_id"]
|
|
existing = db.query(svc.Project).filter_by(id=target_id).first()
|
|
if existing is not None:
|
|
s.project_existing_id = existing.id
|
|
s.project_existing_name = existing.name
|
|
s.project_suggested_name = existing.name
|
|
s.project_match = "exact"
|
|
else:
|
|
# Stale ID — treat as create_new with the cluster's typed name.
|
|
s.project_existing_id = None
|
|
s.project_match = "create_new"
|
|
elif "project_name" in ov:
|
|
new_name = (ov["project_name"] or "").strip()
|
|
if new_name:
|
|
s.project_suggested_name = new_name
|
|
s.project_existing_id = None
|
|
s.project_existing_name = None
|
|
s.project_match = "create_new"
|
|
|
|
if ov.get("location_id"):
|
|
target_id = ov["location_id"]
|
|
existing = db.query(svc.MonitoringLocation).filter_by(id=target_id).first()
|
|
# Only attach if the location belongs to the (now chosen) project.
|
|
chosen_project_id = s.project_existing_id
|
|
if existing is not None and (
|
|
chosen_project_id is None or existing.project_id == chosen_project_id
|
|
):
|
|
s.location_existing_id = existing.id
|
|
s.location_existing_name = existing.name
|
|
s.location_suggested_name = existing.name
|
|
s.location_match = "exact"
|
|
else:
|
|
s.location_existing_id = None
|
|
s.location_match = "create_new"
|
|
elif "location_name" in ov:
|
|
new_name = (ov["location_name"] or "").strip()
|
|
if new_name:
|
|
s.location_suggested_name = new_name
|
|
s.location_existing_id = None
|
|
s.location_existing_name = None
|
|
s.location_match = "create_new"
|
|
|
|
selected.append(s)
|
|
|
|
apply_result = svc.apply_suggestions(db, selected, decided_by="operator")
|
|
|
|
# Invalidate the scan cache so the next /scan picks up the new state.
|
|
_SCAN_CACHE["at"] = 0.0
|
|
_SCAN_CACHE["result"] = None
|
|
|
|
return {
|
|
"applied": apply_result.applied,
|
|
"failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed],
|
|
"not_found": not_found,
|
|
"project_ids_created": apply_result.project_ids_created,
|
|
"location_ids_created": apply_result.location_ids_created,
|
|
"assignment_ids_created": apply_result.assignment_ids_created,
|
|
}
|
|
|
|
|
|
@router.post("/skip")
|
|
async def skip(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Mark cluster_ids as skipped — they won't reappear in future scans."""
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
|
|
|
cluster_ids = body.get("cluster_ids") or []
|
|
if not isinstance(cluster_ids, list):
|
|
raise HTTPException(status_code=400, detail="cluster_ids must be a list")
|
|
|
|
n = svc.skip_clusters(db, cluster_ids, decided_by="operator")
|
|
|
|
_SCAN_CACHE["at"] = 0.0
|
|
_SCAN_CACHE["result"] = None
|
|
|
|
return {"skipped": n}
|
|
|
|
|
|
@router.get("/projects_search")
|
|
def projects_search(
|
|
q: str = "",
|
|
limit: int = 10,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Typeahead search of existing projects for the wizard's per-cluster
|
|
override inputs. Combines case-insensitive substring match with
|
|
rapidfuzz scoring so partial typing and slight typos both surface
|
|
candidates. Always returns a 'Create new' option at the end so the
|
|
operator can confirm they want to create rather than match.
|
|
|
|
Returns:
|
|
{
|
|
"matches": [
|
|
{"id": "...", "name": "...", "score": 0.91, "location_count": 3},
|
|
...
|
|
],
|
|
"create_new": {"label": "Create new: \"<q>\""}
|
|
}
|
|
"""
|
|
q_clean = (q or "").strip()
|
|
q_norm = svc._normalise(q_clean)
|
|
|
|
projects = (
|
|
db.query(Project)
|
|
.filter(Project.status != "deleted")
|
|
.all()
|
|
)
|
|
|
|
scored: list[tuple[Project, float]] = []
|
|
for p in projects:
|
|
p_norm = svc._normalise(p.name)
|
|
if not q_norm:
|
|
# Empty query → return top projects by latest activity
|
|
# (cheap heuristic: keep them all and sort by name).
|
|
scored.append((p, 0.0))
|
|
continue
|
|
# Cheap substring boost: if the normalised query is a substring,
|
|
# treat that as 1.0 regardless of WRatio.
|
|
if q_norm in p_norm:
|
|
scored.append((p, 1.0))
|
|
continue
|
|
score = svc.similarity(q_norm, p_norm)
|
|
if score >= 0.50: # surfacing threshold; not the match threshold
|
|
scored.append((p, score))
|
|
|
|
# Sort: score desc, then name asc.
|
|
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
|
|
scored = scored[:limit]
|
|
|
|
# Compute location counts in one batch query.
|
|
loc_counts: dict[str, int] = {}
|
|
if scored:
|
|
from sqlalchemy import func
|
|
ids = [p.id for p, _ in scored]
|
|
rows = (
|
|
db.query(MonitoringLocation.project_id, func.count(MonitoringLocation.id))
|
|
.filter(MonitoringLocation.project_id.in_(ids))
|
|
.group_by(MonitoringLocation.project_id)
|
|
.all()
|
|
)
|
|
loc_counts = {pid: cnt for pid, cnt in rows}
|
|
|
|
return {
|
|
"matches": [
|
|
{
|
|
"id": p.id,
|
|
"name": p.name,
|
|
"project_number": p.project_number,
|
|
"client_name": p.client_name,
|
|
"score": round(score, 3),
|
|
"location_count": loc_counts.get(p.id, 0),
|
|
}
|
|
for p, score in scored
|
|
],
|
|
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
|
|
}
|
|
|
|
|
|
@router.get("/locations_search")
|
|
def locations_search(
|
|
project_id: str,
|
|
q: str = "",
|
|
limit: int = 10,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Typeahead search of existing locations within a project."""
|
|
if not project_id:
|
|
raise HTTPException(status_code=400, detail="project_id required")
|
|
|
|
q_clean = (q or "").strip()
|
|
q_norm = svc._normalise(q_clean)
|
|
|
|
locations = (
|
|
db.query(MonitoringLocation)
|
|
.filter(MonitoringLocation.project_id == project_id)
|
|
.filter(MonitoringLocation.location_type == "vibration")
|
|
# Don't propose creating assignments at removed locations — they
|
|
# were intentionally decommissioned and shouldn't be backfill targets.
|
|
.filter(MonitoringLocation.removed_at == None) # noqa: E711
|
|
.all()
|
|
)
|
|
|
|
scored: list[tuple[MonitoringLocation, float]] = []
|
|
for l in locations:
|
|
l_norm = svc._normalise(l.name)
|
|
if not q_norm:
|
|
scored.append((l, 0.0))
|
|
continue
|
|
if q_norm in l_norm:
|
|
scored.append((l, 1.0))
|
|
continue
|
|
score = svc.similarity(q_norm, l_norm)
|
|
if score >= 0.50:
|
|
scored.append((l, score))
|
|
|
|
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
|
|
scored = scored[:limit]
|
|
|
|
return {
|
|
"matches": [
|
|
{
|
|
"id": l.id,
|
|
"name": l.name,
|
|
"address": l.address,
|
|
"score": round(score, 3),
|
|
}
|
|
for l, score in scored
|
|
],
|
|
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
|
|
}
|