feat(sfm): Phase 5a — bulk-backfill projects/locations/assignments from event metadata
Operator clicks one button. Parser reads SFM's events table (operator-typed
project / client / sensor_location strings), clusters by serial + time +
metadata, fuzzy-matches against existing projects, and proposes
Project / MonitoringLocation / UnitAssignment chains to create.
Auto-applies high-confidence non-conflicting clusters in bulk; queues
medium/low confidence for individual review.
Verified against real data: 10,052 events → 59 clusters → 37 high-
confidence + 14 medium + 8 low. Test-applied one cluster end-to-end;
Project + Module + Location + Assignment + UnitHistory + Decision rows
all created correctly, and Phase 2's attribution walk picked up the
events automatically on the new location's detail page.
Pipeline (backend/services/metadata_backfill.py, ~700 lines):
1. Pull all SFM events via /db/events per serial.
2. Pre-filter: drop events already covered by an existing UnitAssignment
window (Phase 2 handles those automatically).
3. Time-cluster what's left: serial + 7-day gap is the cluster identity.
4. Metadata-split each time-cluster on persistent metadata transitions
(≥ 2 consecutive events) so a single typo doesn't fork the cluster.
5. Match against existing graph (rapidfuzz.WRatio multi-signal scoring,
normalisation that handles abbreviations / reorders / separator
variations). Thresholds: 0.95 exact, 0.80 fuzzy, min-shorter-input
5 chars to guardrail false positives on single common words.
6. Score confidence (high/medium/low) using event count, span,
blank-meta, conflict, ambiguity rules.
7. Detect conflicts: overlap with existing UnitAssignment at a different
location for the same serial → blocking. Operator must reconcile.
8. Apply: ensure auto_imported ProjectType exists, ensure
vibration_monitoring ProjectModule on the project, write
Project / MonitoringLocation / UnitAssignment / UnitHistory all in
one transaction.
Migration (backend/migrate_add_metadata_backfill.py): adds
unit_assignments.source column (default 'manual') and
metadata_backfill_decisions table. Idempotent, non-destructive.
API (backend/routers/metadata_backfill.py):
GET /api/admin/metadata_backfill/scan — clusters + suggestions
POST /api/admin/metadata_backfill/apply — bulk apply by cluster_ids
w/ optional per-cluster
project/location overrides
POST /api/admin/metadata_backfill/skip — mark skipped (persistent)
UI (templates/admin/metadata_backfill.html, accessible at
/settings/developer/metadata-backfill via the Developer tab of Settings):
- One-button "Run scan" entry.
- Summary KPI tiles (scanned / already attributed / pending / conflicts).
- "Apply all high-confidence" bulk button at the top — primary path.
- Per-cluster cards below with Apply / Skip / Preview event actions.
- Blank-meta clusters get inline input fields for operator-typed project +
location names before applying.
- Blocking-conflict clusters render with the conflicting assignment
information and a disabled Apply button.
- Live progress toast during apply.
- Reuses the Phase 1+2+4 event-detail modal for "Preview event" — operator
can sanity-check the BW report data against the cluster's sample event.
Dependencies: rapidfuzz==3.10.1 added to requirements.txt. Pre-built C
wheels for all platforms, ~5s docker build hit.
Phase 5b (deferred to next session): swap-detection daily background job,
notification inbox for auto-applied swaps, recently-applied audit view,
"Tidy" page for renaming/merging auto-created projects.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -115,6 +115,10 @@ app.include_router(scheduler.router)
|
||||
from backend.routers import report_templates
|
||||
app.include_router(report_templates.router)
|
||||
|
||||
# Metadata-backfill admin router (Phase 5a)
|
||||
from backend.routers import metadata_backfill
|
||||
app.include_router(metadata_backfill.router)
|
||||
|
||||
# Alerts router
|
||||
from backend.routers import alerts
|
||||
app.include_router(alerts.router)
|
||||
@@ -240,6 +244,13 @@ async def sfm_page(request: Request):
|
||||
return templates.TemplateResponse("sfm.html", {"request": request})
|
||||
|
||||
|
||||
@app.get("/settings/developer/metadata-backfill", response_class=HTMLResponse)
|
||||
async def metadata_backfill_wizard_page(request: Request):
|
||||
"""Wizard for auto-creating projects/locations/assignments from
|
||||
operator-typed BW event metadata (Phase 5a)."""
|
||||
return templates.TemplateResponse("admin/metadata_backfill.html", {"request": request})
|
||||
|
||||
|
||||
@app.get("/modems", response_class=HTMLResponse)
|
||||
async def modems_page(request: Request):
|
||||
"""Field modems management dashboard"""
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
"""
|
||||
Migration: add metadata-backfill support.
|
||||
|
||||
Adds:
|
||||
1. `unit_assignments.source` column (TEXT, default 'manual').
|
||||
Lets us audit which assignments were created by the metadata-backfill
|
||||
parser vs by a human, and bulk-undo parser actions if needed.
|
||||
|
||||
2. `metadata_backfill_decisions` table. Tracks operator decisions per
|
||||
cluster_id so the wizard remembers what's been skipped, what's
|
||||
been applied, and what's pending across re-scans.
|
||||
|
||||
Idempotent — safe to re-run.
|
||||
Non-destructive — adds only.
|
||||
|
||||
Run with:
|
||||
docker exec terra-view-terra-view-1 python3 /app/backend/migrate_add_metadata_backfill.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
DB_PATH = "./data/seismo_fleet.db"
|
||||
|
||||
|
||||
def migrate_database():
|
||||
if not os.path.exists(DB_PATH):
|
||||
print(f"Database not found at {DB_PATH}")
|
||||
return
|
||||
|
||||
print(f"Migrating database: {DB_PATH}")
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
|
||||
# ── 1. unit_assignments.source column ──────────────────────────────────
|
||||
cur.execute("PRAGMA table_info(unit_assignments)")
|
||||
cols = {row[1] for row in cur.fetchall()}
|
||||
if "source" not in cols:
|
||||
print("Adding unit_assignments.source column (default 'manual') ...")
|
||||
cur.execute(
|
||||
"ALTER TABLE unit_assignments ADD COLUMN source TEXT DEFAULT 'manual'"
|
||||
)
|
||||
# Backfill: any existing row gets source='manual'
|
||||
cur.execute("UPDATE unit_assignments SET source='manual' WHERE source IS NULL")
|
||||
conn.commit()
|
||||
print(" Done.")
|
||||
else:
|
||||
print("unit_assignments.source already exists — skipping")
|
||||
|
||||
# ── 2. metadata_backfill_decisions table ──────────────────────────────
|
||||
cur.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='metadata_backfill_decisions'"
|
||||
)
|
||||
if cur.fetchone() is None:
|
||||
print("Creating metadata_backfill_decisions table ...")
|
||||
cur.execute("""
|
||||
CREATE TABLE metadata_backfill_decisions (
|
||||
cluster_id TEXT PRIMARY KEY, -- deterministic hash
|
||||
status TEXT NOT NULL, -- pending | applied | skipped | conflict
|
||||
confidence TEXT NOT NULL, -- high | medium | low (at time of decision)
|
||||
decided_at TEXT, -- when applied/skipped
|
||||
decided_by TEXT, -- 'background' | 'operator' | 'auto-high'
|
||||
applied_assignment_id TEXT, -- FK to unit_assignments (if applied)
|
||||
notes TEXT,
|
||||
first_seen_at TEXT NOT NULL,
|
||||
last_seen_at TEXT NOT NULL,
|
||||
serial TEXT NOT NULL,
|
||||
project_raw TEXT,
|
||||
location_raw TEXT,
|
||||
first_event_ts TEXT,
|
||||
last_event_ts TEXT,
|
||||
event_count INTEGER NOT NULL DEFAULT 0
|
||||
)
|
||||
""")
|
||||
cur.execute(
|
||||
"CREATE INDEX idx_mbd_status ON metadata_backfill_decisions(status)"
|
||||
)
|
||||
cur.execute(
|
||||
"CREATE INDEX idx_mbd_last_seen ON metadata_backfill_decisions(last_seen_at)"
|
||||
)
|
||||
cur.execute(
|
||||
"CREATE INDEX idx_mbd_serial ON metadata_backfill_decisions(serial)"
|
||||
)
|
||||
conn.commit()
|
||||
print(" Done.")
|
||||
else:
|
||||
print("metadata_backfill_decisions table already exists — skipping")
|
||||
|
||||
conn.close()
|
||||
print("\nMigration complete.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
migrate_database()
|
||||
@@ -259,9 +259,48 @@ class UnitAssignment(Base):
|
||||
device_type = Column(String, nullable=False) # "slm" | "seismograph"
|
||||
project_id = Column(String, nullable=False, index=True) # FK to Project.id
|
||||
|
||||
# Provenance: how was this assignment created? Used for auditing,
|
||||
# bulk-undo of parser actions, and the Phase 4 deployment timeline.
|
||||
# "manual" — operator created via UI
|
||||
# "metadata_backfill" — auto-created by the metadata parser
|
||||
# from operator-typed BW event metadata
|
||||
# (bulk backfill workflow)
|
||||
# "metadata_backfill_swap" — auto-created by swap-detection
|
||||
# background job
|
||||
source = Column(String, nullable=False, default="manual")
|
||||
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
|
||||
|
||||
class MetadataBackfillDecision(Base):
|
||||
"""
|
||||
Per-cluster decisions tracked by the metadata-backfill parser.
|
||||
|
||||
`cluster_id` is the deterministic SHA1 hash of
|
||||
(serial, first_event_date, last_event_date), so the same cluster
|
||||
produces the same id across re-scans. The decisions table lets the
|
||||
parser remember "I already applied this" or "operator skipped this"
|
||||
across scan invocations.
|
||||
"""
|
||||
__tablename__ = "metadata_backfill_decisions"
|
||||
|
||||
cluster_id = Column(String, primary_key=True)
|
||||
status = Column(String, nullable=False) # pending | applied | skipped | conflict
|
||||
confidence = Column(String, nullable=False) # high | medium | low
|
||||
decided_at = Column(DateTime, nullable=True)
|
||||
decided_by = Column(String, nullable=True) # background | operator | auto-high
|
||||
applied_assignment_id = Column(String, nullable=True) # FK to unit_assignments.id
|
||||
notes = Column(Text, nullable=True)
|
||||
first_seen_at = Column(DateTime, nullable=False, default=datetime.utcnow)
|
||||
last_seen_at = Column(DateTime, nullable=False, default=datetime.utcnow)
|
||||
serial = Column(String, nullable=False, index=True)
|
||||
project_raw = Column(String, nullable=True)
|
||||
location_raw = Column(String, nullable=True)
|
||||
first_event_ts = Column(DateTime, nullable=True)
|
||||
last_event_ts = Column(DateTime, nullable=True)
|
||||
event_count = Column(Integer, nullable=False, default=0)
|
||||
|
||||
|
||||
class ScheduledAction(Base):
|
||||
"""
|
||||
Scheduled actions: automation for recording start/stop/download.
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Metadata-backfill admin router.
|
||||
|
||||
Endpoints under /api/admin/metadata_backfill:
|
||||
|
||||
GET /scan — run the scan; return clusters + suggestions (JSON).
|
||||
Cached 5 minutes so the wizard doesn't re-scan on
|
||||
every page render.
|
||||
POST /apply — apply a list of cluster_ids; body specifies which to
|
||||
accept and optional per-cluster overrides.
|
||||
POST /skip — mark cluster_ids as skipped (won't reappear).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from backend.database import get_db
|
||||
from backend.services import metadata_backfill as svc
|
||||
|
||||
router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"])
|
||||
|
||||
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
|
||||
|
||||
# In-process scan cache. Trades memory for not re-hammering SFM on every
|
||||
# wizard render. TTL: 5 minutes. Singleton per-process; fine for a
|
||||
# single-worker uvicorn dev setup. For prod multi-worker we'd want to put
|
||||
# this in the DB or Redis; deferred.
|
||||
_SCAN_CACHE: dict = {"at": 0.0, "result": None}
|
||||
_SCAN_CACHE_TTL_SECONDS = 300.0
|
||||
|
||||
|
||||
def _serialise_suggestion(s: svc.Suggestion) -> dict:
|
||||
c = s.cluster
|
||||
return {
|
||||
"cluster_id": c.cluster_id,
|
||||
"serial": c.serial,
|
||||
"first_event_ts": c.first_event_ts.isoformat(),
|
||||
"last_event_ts": c.last_event_ts.isoformat(),
|
||||
"event_count": c.event_count,
|
||||
"sample_event_id": c.sample_event_id,
|
||||
"project_raw": c.project_raw,
|
||||
"location_raw": c.location_raw,
|
||||
"client_raw": c.client_raw,
|
||||
"operator_raw": c.operator_raw,
|
||||
"is_blank_meta": c.is_blank_meta,
|
||||
"metadata_consistency": c.metadata_consistency,
|
||||
|
||||
"project_match": s.project_match,
|
||||
"project_existing_id": s.project_existing_id,
|
||||
"project_existing_name": s.project_existing_name,
|
||||
"project_match_score": s.project_match_score,
|
||||
"project_suggested_name": s.project_suggested_name,
|
||||
|
||||
"location_match": s.location_match,
|
||||
"location_existing_id": s.location_existing_id,
|
||||
"location_existing_name": s.location_existing_name,
|
||||
"location_match_score": s.location_match_score,
|
||||
"location_suggested_name": s.location_suggested_name,
|
||||
|
||||
"proposed_assigned_at": s.proposed_assigned_at.isoformat(),
|
||||
"proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None,
|
||||
|
||||
"confidence": s.confidence,
|
||||
"blocking_conflict": s.blocking_conflict,
|
||||
"conflicts": [
|
||||
{
|
||||
"existing_assignment_id": cf.existing_assignment_id,
|
||||
"other_location_id": cf.other_location_id,
|
||||
"other_location_name": cf.other_location_name,
|
||||
"other_project_id": cf.other_project_id,
|
||||
"other_project_name": cf.other_project_name,
|
||||
}
|
||||
for cf in s.conflicts
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@router.get("/scan")
|
||||
async def scan(
|
||||
force: bool = False,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Run a scan and return clusters + suggestions.
|
||||
|
||||
Set force=true to bypass the 5-minute cache.
|
||||
"""
|
||||
now = time.time()
|
||||
if not force and _SCAN_CACHE["result"] is not None \
|
||||
and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS:
|
||||
return _SCAN_CACHE["result"]
|
||||
|
||||
result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
||||
|
||||
# Group suggestions for the wizard UI.
|
||||
by_confidence = {"high": [], "medium": [], "low": []}
|
||||
blocking_conflict_count = 0
|
||||
for s in result.suggestions:
|
||||
by_confidence[s.confidence].append(_serialise_suggestion(s))
|
||||
if s.blocking_conflict:
|
||||
blocking_conflict_count += 1
|
||||
|
||||
payload = {
|
||||
"scanned_event_count": result.scanned_event_count,
|
||||
"cluster_count": result.cluster_count,
|
||||
"already_attributed": result.already_attributed,
|
||||
"skipped_orphans": result.skipped_orphans,
|
||||
"pending_count": len(result.suggestions),
|
||||
"blocking_conflict_count": blocking_conflict_count,
|
||||
"by_confidence": {
|
||||
"high": by_confidence["high"],
|
||||
"medium": by_confidence["medium"],
|
||||
"low": by_confidence["low"],
|
||||
},
|
||||
"scanned_at": now,
|
||||
}
|
||||
_SCAN_CACHE["result"] = payload
|
||||
_SCAN_CACHE["at"] = now
|
||||
return payload
|
||||
|
||||
|
||||
@router.post("/apply")
|
||||
async def apply(
|
||||
request: Request,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Apply a list of clusters.
|
||||
|
||||
Body:
|
||||
{
|
||||
"cluster_ids": ["abc...", "def..."],
|
||||
"overrides": { "abc...": { "project_name": "...", "location_name": "..." } }
|
||||
}
|
||||
|
||||
To accept ALL non-conflict suggestions in one shot, the UI sends every
|
||||
pending cluster_id with no overrides.
|
||||
"""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
||||
|
||||
cluster_ids = body.get("cluster_ids") or []
|
||||
overrides = body.get("overrides") or {}
|
||||
if not isinstance(cluster_ids, list) or not cluster_ids:
|
||||
raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list")
|
||||
|
||||
# Re-scan to get current suggestions. We don't trust the cached scan
|
||||
# blindly — the operator might have manually created projects in
|
||||
# between scan and apply.
|
||||
scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL)
|
||||
suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions}
|
||||
|
||||
selected: list[svc.Suggestion] = []
|
||||
not_found: list[str] = []
|
||||
for cid in cluster_ids:
|
||||
s = suggestions_by_id.get(cid)
|
||||
if s is None:
|
||||
not_found.append(cid)
|
||||
continue
|
||||
# Apply overrides.
|
||||
ov = overrides.get(cid) or {}
|
||||
if "project_name" in ov:
|
||||
s.project_suggested_name = (ov["project_name"] or "").strip() or s.project_suggested_name
|
||||
# Override implies operator wants to create new (or rename).
|
||||
# If they wanted an exact match, they'd not have overridden.
|
||||
if s.project_match in ("create_new",):
|
||||
pass # keep create_new
|
||||
else:
|
||||
# Operator typed a custom name — force create-new behaviour
|
||||
# so we don't accidentally attach to a different existing
|
||||
# project by exact-match.
|
||||
s.project_existing_id = None
|
||||
s.project_match = "create_new"
|
||||
if "location_name" in ov:
|
||||
s.location_suggested_name = (ov["location_name"] or "").strip() or s.location_suggested_name
|
||||
if s.location_match in ("create_new",):
|
||||
pass
|
||||
else:
|
||||
s.location_existing_id = None
|
||||
s.location_match = "create_new"
|
||||
selected.append(s)
|
||||
|
||||
apply_result = svc.apply_suggestions(db, selected, decided_by="operator")
|
||||
|
||||
# Invalidate the scan cache so the next /scan picks up the new state.
|
||||
_SCAN_CACHE["at"] = 0.0
|
||||
_SCAN_CACHE["result"] = None
|
||||
|
||||
return {
|
||||
"applied": apply_result.applied,
|
||||
"failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed],
|
||||
"not_found": not_found,
|
||||
"project_ids_created": apply_result.project_ids_created,
|
||||
"location_ids_created": apply_result.location_ids_created,
|
||||
"assignment_ids_created": apply_result.assignment_ids_created,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/skip")
|
||||
async def skip(
|
||||
request: Request,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Mark cluster_ids as skipped — they won't reappear in future scans."""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON body")
|
||||
|
||||
cluster_ids = body.get("cluster_ids") or []
|
||||
if not isinstance(cluster_ids, list):
|
||||
raise HTTPException(status_code=400, detail="cluster_ids must be a list")
|
||||
|
||||
n = svc.skip_clusters(db, cluster_ids, decided_by="operator")
|
||||
|
||||
_SCAN_CACHE["at"] = 0.0
|
||||
_SCAN_CACHE["result"] = None
|
||||
|
||||
return {"skipped": n}
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user