Files
terra-view/backend/routers/metadata_backfill.py
T
serversdown d5a0163852 feat(locations): soft-remove monitoring locations without destroying history
When a client drops a location from scope mid-project (e.g. the office
half of a museum+office monitoring job), operators couldn't previously
mark it as no-longer-active without either deleting it (which would
orphan historical events) or leaving it in the active list looking
deployable.  Now there's a proper middle ground.

Data model
- MonitoringLocation gets two new nullable columns:
  - removed_at      — NULL means active; set means soft-removed
  - removal_reason  — optional operator note
  Migration: backend/migrate_add_location_removed.py (idempotent)

Endpoints
- POST /api/projects/{p}/locations/{l}/remove
    Body: { effective_date?: ISO-datetime, reason?: str }
    Side effects (cascade):
      1. Closes active UnitAssignment rows at this location
         (assigned_until = effective_date, status = "completed")
      2. Cancels pending ScheduledActions at this location
      3. Marks location.removed_at = effective_date
    Returns counts of assignments closed + actions cancelled.
- POST /api/projects/{p}/locations/{l}/restore
    Clears removed_at + removal_reason.  Does NOT auto-reopen
    assignments — operator creates new ones if resuming monitoring.

Active-surface filters
- locations-json defaults to active-only; pass include_removed=true
  for historical / reporting views.  Schedule modal dropdowns now
  exclude removed locations automatically.
- Metadata-backfill fuzzy matcher excludes removed locations from
  proposed targets (don't want backfill creating new assignments at
  decommissioned locations).
- Vibration-summary per_location rollup includes removed locations
  (so historical event totals stay accurate) but tags each with
  removed_at so the UI can show a badge.

UI
- Project detail page's Monitoring Locations section now splits into:
    Active locations (full card with Assign / Edit / Remove / Delete)
    Removed locations (collapsed <details>, greyed cards, Restore button,
                       shows removal date + reason)
- New per-card "Remove" button → opens confirmation modal explaining
  the cascade, with optional effective-date (defaults to now,
  backdateable) and reason fields.
- Unit detail's SFM Events attribution cell shows a small "removed"
  badge next to historical attributions whose location is no longer
  active.  Same pattern in vibration_summary's top-locations list.
- Soft-removal indicator surfaced through the events_for_unit
  attribution payload as location_removed_at.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 22:22:40 +00:00

398 lines
14 KiB
Python

"""
Metadata-backfill admin router.
Endpoints under /api/admin/metadata_backfill:
GET /scan — run the scan; return clusters + suggestions (JSON).
Cached 5 minutes so the wizard doesn't re-scan on
every page render.
POST /apply — apply a list of cluster_ids; body specifies which to
accept and optional per-cluster overrides.
POST /skip — mark cluster_ids as skipped (won't reappear).
"""
from __future__ import annotations
import os
import time
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from backend.database import get_db
from backend.models import Project, MonitoringLocation
from backend.services import metadata_backfill as svc
router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"])
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
# In-process scan cache. Trades memory for not re-hammering SFM on every
# wizard render. TTL: 5 minutes. Singleton per-process; fine for a
# single-worker uvicorn dev setup. For prod multi-worker we'd want to put
# this in the DB or Redis; deferred.
_SCAN_CACHE: dict = {"at": 0.0, "result": None}
_SCAN_CACHE_TTL_SECONDS = 300.0
def _serialise_suggestion(s: svc.Suggestion) -> dict:
c = s.cluster
return {
"cluster_id": c.cluster_id,
"serial": c.serial,
"first_event_ts": c.first_event_ts.isoformat(),
"last_event_ts": c.last_event_ts.isoformat(),
"event_count": c.event_count,
"sample_event_id": c.sample_event_id,
"project_raw": c.project_raw,
"project_root": c.project_root,
"location_raw": c.location_raw,
"client_raw": c.client_raw,
"operator_raw": c.operator_raw,
"is_blank_meta": c.is_blank_meta,
"metadata_consistency": c.metadata_consistency,
"project_match": s.project_match,
"project_existing_id": s.project_existing_id,
"project_existing_name": s.project_existing_name,
"project_match_score": s.project_match_score,
"project_suggested_name": s.project_suggested_name,
"location_match": s.location_match,
"location_existing_id": s.location_existing_id,
"location_existing_name": s.location_existing_name,
"location_match_score": s.location_match_score,
"location_suggested_name": s.location_suggested_name,
"proposed_assigned_at": s.proposed_assigned_at.isoformat(),
"proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None,
"confidence": s.confidence,
"blocking_conflict": s.blocking_conflict,
"conflicts": [
{
"existing_assignment_id": cf.existing_assignment_id,
"other_location_id": cf.other_location_id,
"other_location_name": cf.other_location_name,
"other_project_id": cf.other_project_id,
"other_project_name": cf.other_project_name,
}
for cf in s.conflicts
],
}
@router.get("/scan")
async def scan(
force: bool = False,
db: Session = Depends(get_db),
):
"""Run a scan and return clusters + suggestions.
Set force=true to bypass the 5-minute cache.
"""
now = time.time()
if not force and _SCAN_CACHE["result"] is not None \
and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS:
return _SCAN_CACHE["result"]
result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
# Group suggestions for the wizard UI.
by_confidence = {"high": [], "medium": [], "low": []}
blocking_conflict_count = 0
for s in result.suggestions:
by_confidence[s.confidence].append(_serialise_suggestion(s))
if s.blocking_conflict:
blocking_conflict_count += 1
payload = {
"scanned_event_count": result.scanned_event_count,
"cluster_count": result.cluster_count,
"already_attributed": result.already_attributed,
"skipped_orphans": result.skipped_orphans,
"pending_count": len(result.suggestions),
"blocking_conflict_count": blocking_conflict_count,
"by_confidence": {
"high": by_confidence["high"],
"medium": by_confidence["medium"],
"low": by_confidence["low"],
},
"scanned_at": now,
}
_SCAN_CACHE["result"] = payload
_SCAN_CACHE["at"] = now
return payload
@router.post("/apply")
async def apply(
request: Request,
db: Session = Depends(get_db),
):
"""Apply a list of clusters.
Body:
{
"cluster_ids": ["abc...", "def..."],
"overrides": { "abc...": { "project_name": "...", "location_name": "..." } }
}
To accept ALL non-conflict suggestions in one shot, the UI sends every
pending cluster_id with no overrides.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
cluster_ids = body.get("cluster_ids") or []
overrides = body.get("overrides") or {}
if not isinstance(cluster_ids, list) or not cluster_ids:
raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list")
# Re-scan to get current suggestions. We don't trust the cached scan
# blindly — the operator might have manually created projects in
# between scan and apply.
scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions}
selected: list[svc.Suggestion] = []
not_found: list[str] = []
for cid in cluster_ids:
s = suggestions_by_id.get(cid)
if s is None:
not_found.append(cid)
continue
# Apply overrides. Per-cluster overrides take precedence over the
# parser's suggested match. Four override fields supported:
# project_id — attach to an existing Project (operator picked
# from the typeahead)
# project_name — create new project with this name (operator
# typed a custom name not matching anything)
# location_id — attach to an existing MonitoringLocation
# location_name — create new location with this name
# project_id + location_id pairings: location_id is only honored
# if its project_id matches the chosen project (otherwise treated
# as a create-new).
ov = overrides.get(cid) or {}
if ov.get("project_id"):
target_id = ov["project_id"]
existing = db.query(svc.Project).filter_by(id=target_id).first()
if existing is not None:
s.project_existing_id = existing.id
s.project_existing_name = existing.name
s.project_suggested_name = existing.name
s.project_match = "exact"
else:
# Stale ID — treat as create_new with the cluster's typed name.
s.project_existing_id = None
s.project_match = "create_new"
elif "project_name" in ov:
new_name = (ov["project_name"] or "").strip()
if new_name:
s.project_suggested_name = new_name
s.project_existing_id = None
s.project_existing_name = None
s.project_match = "create_new"
if ov.get("location_id"):
target_id = ov["location_id"]
existing = db.query(svc.MonitoringLocation).filter_by(id=target_id).first()
# Only attach if the location belongs to the (now chosen) project.
chosen_project_id = s.project_existing_id
if existing is not None and (
chosen_project_id is None or existing.project_id == chosen_project_id
):
s.location_existing_id = existing.id
s.location_existing_name = existing.name
s.location_suggested_name = existing.name
s.location_match = "exact"
else:
s.location_existing_id = None
s.location_match = "create_new"
elif "location_name" in ov:
new_name = (ov["location_name"] or "").strip()
if new_name:
s.location_suggested_name = new_name
s.location_existing_id = None
s.location_existing_name = None
s.location_match = "create_new"
selected.append(s)
apply_result = svc.apply_suggestions(db, selected, decided_by="operator")
# Invalidate the scan cache so the next /scan picks up the new state.
_SCAN_CACHE["at"] = 0.0
_SCAN_CACHE["result"] = None
return {
"applied": apply_result.applied,
"failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed],
"not_found": not_found,
"project_ids_created": apply_result.project_ids_created,
"location_ids_created": apply_result.location_ids_created,
"assignment_ids_created": apply_result.assignment_ids_created,
}
@router.post("/skip")
async def skip(
request: Request,
db: Session = Depends(get_db),
):
"""Mark cluster_ids as skipped — they won't reappear in future scans."""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
cluster_ids = body.get("cluster_ids") or []
if not isinstance(cluster_ids, list):
raise HTTPException(status_code=400, detail="cluster_ids must be a list")
n = svc.skip_clusters(db, cluster_ids, decided_by="operator")
_SCAN_CACHE["at"] = 0.0
_SCAN_CACHE["result"] = None
return {"skipped": n}
@router.get("/projects_search")
def projects_search(
q: str = "",
limit: int = 10,
db: Session = Depends(get_db),
):
"""Typeahead search of existing projects for the wizard's per-cluster
override inputs. Combines case-insensitive substring match with
rapidfuzz scoring so partial typing and slight typos both surface
candidates. Always returns a 'Create new' option at the end so the
operator can confirm they want to create rather than match.
Returns:
{
"matches": [
{"id": "...", "name": "...", "score": 0.91, "location_count": 3},
...
],
"create_new": {"label": "Create new: \"<q>\""}
}
"""
q_clean = (q or "").strip()
q_norm = svc._normalise(q_clean)
projects = (
db.query(Project)
.filter(Project.status != "deleted")
.all()
)
scored: list[tuple[Project, float]] = []
for p in projects:
p_norm = svc._normalise(p.name)
if not q_norm:
# Empty query → return top projects by latest activity
# (cheap heuristic: keep them all and sort by name).
scored.append((p, 0.0))
continue
# Cheap substring boost: if the normalised query is a substring,
# treat that as 1.0 regardless of WRatio.
if q_norm in p_norm:
scored.append((p, 1.0))
continue
score = svc.similarity(q_norm, p_norm)
if score >= 0.50: # surfacing threshold; not the match threshold
scored.append((p, score))
# Sort: score desc, then name asc.
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
scored = scored[:limit]
# Compute location counts in one batch query.
loc_counts: dict[str, int] = {}
if scored:
from sqlalchemy import func
ids = [p.id for p, _ in scored]
rows = (
db.query(MonitoringLocation.project_id, func.count(MonitoringLocation.id))
.filter(MonitoringLocation.project_id.in_(ids))
.group_by(MonitoringLocation.project_id)
.all()
)
loc_counts = {pid: cnt for pid, cnt in rows}
return {
"matches": [
{
"id": p.id,
"name": p.name,
"project_number": p.project_number,
"client_name": p.client_name,
"score": round(score, 3),
"location_count": loc_counts.get(p.id, 0),
}
for p, score in scored
],
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
}
@router.get("/locations_search")
def locations_search(
project_id: str,
q: str = "",
limit: int = 10,
db: Session = Depends(get_db),
):
"""Typeahead search of existing locations within a project."""
if not project_id:
raise HTTPException(status_code=400, detail="project_id required")
q_clean = (q or "").strip()
q_norm = svc._normalise(q_clean)
locations = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project_id)
.filter(MonitoringLocation.location_type == "vibration")
# Don't propose creating assignments at removed locations — they
# were intentionally decommissioned and shouldn't be backfill targets.
.filter(MonitoringLocation.removed_at == None) # noqa: E711
.all()
)
scored: list[tuple[MonitoringLocation, float]] = []
for l in locations:
l_norm = svc._normalise(l.name)
if not q_norm:
scored.append((l, 0.0))
continue
if q_norm in l_norm:
scored.append((l, 1.0))
continue
score = svc.similarity(q_norm, l_norm)
if score >= 0.50:
scored.append((l, score))
scored.sort(key=lambda t: (-t[1], t[0].name.lower()))
scored = scored[:limit]
return {
"matches": [
{
"id": l.id,
"name": l.name,
"address": l.address,
"score": round(score, 3),
}
for l, score in scored
],
"create_new": {"label": f'Create new: "{q_clean}"' if q_clean else None},
}