From 42de06f441b5159074698ad994bf3ae37cace92d Mon Sep 17 00:00:00 2001 From: serversdown Date: Tue, 12 May 2026 05:54:57 +0000 Subject: [PATCH] =?UTF-8?q?feat(sfm):=20Phase=205a=20=E2=80=94=20bulk-back?= =?UTF-8?q?fill=20projects/locations/assignments=20from=20event=20metadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operator clicks one button. Parser reads SFM's events table (operator-typed project / client / sensor_location strings), clusters by serial + time + metadata, fuzzy-matches against existing projects, and proposes Project / MonitoringLocation / UnitAssignment chains to create. Auto-applies high-confidence non-conflicting clusters in bulk; queues medium/low confidence for individual review. Verified against real data: 10,052 events → 59 clusters → 37 high- confidence + 14 medium + 8 low. Test-applied one cluster end-to-end; Project + Module + Location + Assignment + UnitHistory + Decision rows all created correctly, and Phase 2's attribution walk picked up the events automatically on the new location's detail page. Pipeline (backend/services/metadata_backfill.py, ~700 lines): 1. Pull all SFM events via /db/events per serial. 2. Pre-filter: drop events already covered by an existing UnitAssignment window (Phase 2 handles those automatically). 3. Time-cluster what's left: serial + 7-day gap is the cluster identity. 4. Metadata-split each time-cluster on persistent metadata transitions (≥ 2 consecutive events) so a single typo doesn't fork the cluster. 5. Match against existing graph (rapidfuzz.WRatio multi-signal scoring, normalisation that handles abbreviations / reorders / separator variations). Thresholds: 0.95 exact, 0.80 fuzzy, min-shorter-input 5 chars to guardrail false positives on single common words. 6. Score confidence (high/medium/low) using event count, span, blank-meta, conflict, ambiguity rules. 7. Detect conflicts: overlap with existing UnitAssignment at a different location for the same serial → blocking. Operator must reconcile. 8. Apply: ensure auto_imported ProjectType exists, ensure vibration_monitoring ProjectModule on the project, write Project / MonitoringLocation / UnitAssignment / UnitHistory all in one transaction. Migration (backend/migrate_add_metadata_backfill.py): adds unit_assignments.source column (default 'manual') and metadata_backfill_decisions table. Idempotent, non-destructive. API (backend/routers/metadata_backfill.py): GET /api/admin/metadata_backfill/scan — clusters + suggestions POST /api/admin/metadata_backfill/apply — bulk apply by cluster_ids w/ optional per-cluster project/location overrides POST /api/admin/metadata_backfill/skip — mark skipped (persistent) UI (templates/admin/metadata_backfill.html, accessible at /settings/developer/metadata-backfill via the Developer tab of Settings): - One-button "Run scan" entry. - Summary KPI tiles (scanned / already attributed / pending / conflicts). - "Apply all high-confidence" bulk button at the top — primary path. - Per-cluster cards below with Apply / Skip / Preview event actions. - Blank-meta clusters get inline input fields for operator-typed project + location names before applying. - Blocking-conflict clusters render with the conflicting assignment information and a disabled Apply button. - Live progress toast during apply. - Reuses the Phase 1+2+4 event-detail modal for "Preview event" — operator can sanity-check the BW report data against the cluster's sample event. Dependencies: rapidfuzz==3.10.1 added to requirements.txt. Pre-built C wheels for all platforms, ~5s docker build hit. Phase 5b (deferred to next session): swap-detection daily background job, notification inbox for auto-applied swaps, recently-applied audit view, "Tidy" page for renaming/merging auto-created projects. Co-Authored-By: Claude Opus 4.7 --- backend/main.py | 11 + backend/migrate_add_metadata_backfill.py | 94 ++ backend/models.py | 39 + backend/routers/metadata_backfill.py | 226 +++++ backend/services/metadata_backfill.py | 1038 ++++++++++++++++++++++ requirements.txt | 1 + templates/admin/metadata_backfill.html | 405 +++++++++ templates/settings.html | 14 + 8 files changed, 1828 insertions(+) create mode 100644 backend/migrate_add_metadata_backfill.py create mode 100644 backend/routers/metadata_backfill.py create mode 100644 backend/services/metadata_backfill.py create mode 100644 templates/admin/metadata_backfill.html diff --git a/backend/main.py b/backend/main.py index b6b9ed2..7397b8e 100644 --- a/backend/main.py +++ b/backend/main.py @@ -115,6 +115,10 @@ app.include_router(scheduler.router) from backend.routers import report_templates app.include_router(report_templates.router) +# Metadata-backfill admin router (Phase 5a) +from backend.routers import metadata_backfill +app.include_router(metadata_backfill.router) + # Alerts router from backend.routers import alerts app.include_router(alerts.router) @@ -240,6 +244,13 @@ async def sfm_page(request: Request): return templates.TemplateResponse("sfm.html", {"request": request}) +@app.get("/settings/developer/metadata-backfill", response_class=HTMLResponse) +async def metadata_backfill_wizard_page(request: Request): + """Wizard for auto-creating projects/locations/assignments from + operator-typed BW event metadata (Phase 5a).""" + return templates.TemplateResponse("admin/metadata_backfill.html", {"request": request}) + + @app.get("/modems", response_class=HTMLResponse) async def modems_page(request: Request): """Field modems management dashboard""" diff --git a/backend/migrate_add_metadata_backfill.py b/backend/migrate_add_metadata_backfill.py new file mode 100644 index 0000000..1afbbd8 --- /dev/null +++ b/backend/migrate_add_metadata_backfill.py @@ -0,0 +1,94 @@ +""" +Migration: add metadata-backfill support. + +Adds: + 1. `unit_assignments.source` column (TEXT, default 'manual'). + Lets us audit which assignments were created by the metadata-backfill + parser vs by a human, and bulk-undo parser actions if needed. + + 2. `metadata_backfill_decisions` table. Tracks operator decisions per + cluster_id so the wizard remembers what's been skipped, what's + been applied, and what's pending across re-scans. + +Idempotent — safe to re-run. +Non-destructive — adds only. + +Run with: + docker exec terra-view-terra-view-1 python3 /app/backend/migrate_add_metadata_backfill.py +""" + +import os +import sqlite3 + +DB_PATH = "./data/seismo_fleet.db" + + +def migrate_database(): + if not os.path.exists(DB_PATH): + print(f"Database not found at {DB_PATH}") + return + + print(f"Migrating database: {DB_PATH}") + conn = sqlite3.connect(DB_PATH) + cur = conn.cursor() + + # ── 1. unit_assignments.source column ────────────────────────────────── + cur.execute("PRAGMA table_info(unit_assignments)") + cols = {row[1] for row in cur.fetchall()} + if "source" not in cols: + print("Adding unit_assignments.source column (default 'manual') ...") + cur.execute( + "ALTER TABLE unit_assignments ADD COLUMN source TEXT DEFAULT 'manual'" + ) + # Backfill: any existing row gets source='manual' + cur.execute("UPDATE unit_assignments SET source='manual' WHERE source IS NULL") + conn.commit() + print(" Done.") + else: + print("unit_assignments.source already exists — skipping") + + # ── 2. metadata_backfill_decisions table ────────────────────────────── + cur.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='metadata_backfill_decisions'" + ) + if cur.fetchone() is None: + print("Creating metadata_backfill_decisions table ...") + cur.execute(""" + CREATE TABLE metadata_backfill_decisions ( + cluster_id TEXT PRIMARY KEY, -- deterministic hash + status TEXT NOT NULL, -- pending | applied | skipped | conflict + confidence TEXT NOT NULL, -- high | medium | low (at time of decision) + decided_at TEXT, -- when applied/skipped + decided_by TEXT, -- 'background' | 'operator' | 'auto-high' + applied_assignment_id TEXT, -- FK to unit_assignments (if applied) + notes TEXT, + first_seen_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + serial TEXT NOT NULL, + project_raw TEXT, + location_raw TEXT, + first_event_ts TEXT, + last_event_ts TEXT, + event_count INTEGER NOT NULL DEFAULT 0 + ) + """) + cur.execute( + "CREATE INDEX idx_mbd_status ON metadata_backfill_decisions(status)" + ) + cur.execute( + "CREATE INDEX idx_mbd_last_seen ON metadata_backfill_decisions(last_seen_at)" + ) + cur.execute( + "CREATE INDEX idx_mbd_serial ON metadata_backfill_decisions(serial)" + ) + conn.commit() + print(" Done.") + else: + print("metadata_backfill_decisions table already exists — skipping") + + conn.close() + print("\nMigration complete.") + + +if __name__ == "__main__": + migrate_database() diff --git a/backend/models.py b/backend/models.py index b3c3665..5ab7761 100644 --- a/backend/models.py +++ b/backend/models.py @@ -259,9 +259,48 @@ class UnitAssignment(Base): device_type = Column(String, nullable=False) # "slm" | "seismograph" project_id = Column(String, nullable=False, index=True) # FK to Project.id + # Provenance: how was this assignment created? Used for auditing, + # bulk-undo of parser actions, and the Phase 4 deployment timeline. + # "manual" — operator created via UI + # "metadata_backfill" — auto-created by the metadata parser + # from operator-typed BW event metadata + # (bulk backfill workflow) + # "metadata_backfill_swap" — auto-created by swap-detection + # background job + source = Column(String, nullable=False, default="manual") + created_at = Column(DateTime, default=datetime.utcnow) +class MetadataBackfillDecision(Base): + """ + Per-cluster decisions tracked by the metadata-backfill parser. + + `cluster_id` is the deterministic SHA1 hash of + (serial, first_event_date, last_event_date), so the same cluster + produces the same id across re-scans. The decisions table lets the + parser remember "I already applied this" or "operator skipped this" + across scan invocations. + """ + __tablename__ = "metadata_backfill_decisions" + + cluster_id = Column(String, primary_key=True) + status = Column(String, nullable=False) # pending | applied | skipped | conflict + confidence = Column(String, nullable=False) # high | medium | low + decided_at = Column(DateTime, nullable=True) + decided_by = Column(String, nullable=True) # background | operator | auto-high + applied_assignment_id = Column(String, nullable=True) # FK to unit_assignments.id + notes = Column(Text, nullable=True) + first_seen_at = Column(DateTime, nullable=False, default=datetime.utcnow) + last_seen_at = Column(DateTime, nullable=False, default=datetime.utcnow) + serial = Column(String, nullable=False, index=True) + project_raw = Column(String, nullable=True) + location_raw = Column(String, nullable=True) + first_event_ts = Column(DateTime, nullable=True) + last_event_ts = Column(DateTime, nullable=True) + event_count = Column(Integer, nullable=False, default=0) + + class ScheduledAction(Base): """ Scheduled actions: automation for recording start/stop/download. diff --git a/backend/routers/metadata_backfill.py b/backend/routers/metadata_backfill.py new file mode 100644 index 0000000..bf97520 --- /dev/null +++ b/backend/routers/metadata_backfill.py @@ -0,0 +1,226 @@ +""" +Metadata-backfill admin router. + +Endpoints under /api/admin/metadata_backfill: + + GET /scan — run the scan; return clusters + suggestions (JSON). + Cached 5 minutes so the wizard doesn't re-scan on + every page render. + POST /apply — apply a list of cluster_ids; body specifies which to + accept and optional per-cluster overrides. + POST /skip — mark cluster_ids as skipped (won't reappear). +""" + +from __future__ import annotations + +import os +import time +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import JSONResponse +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.services import metadata_backfill as svc + +router = APIRouter(prefix="/api/admin/metadata_backfill", tags=["metadata-backfill"]) + +SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") + +# In-process scan cache. Trades memory for not re-hammering SFM on every +# wizard render. TTL: 5 minutes. Singleton per-process; fine for a +# single-worker uvicorn dev setup. For prod multi-worker we'd want to put +# this in the DB or Redis; deferred. +_SCAN_CACHE: dict = {"at": 0.0, "result": None} +_SCAN_CACHE_TTL_SECONDS = 300.0 + + +def _serialise_suggestion(s: svc.Suggestion) -> dict: + c = s.cluster + return { + "cluster_id": c.cluster_id, + "serial": c.serial, + "first_event_ts": c.first_event_ts.isoformat(), + "last_event_ts": c.last_event_ts.isoformat(), + "event_count": c.event_count, + "sample_event_id": c.sample_event_id, + "project_raw": c.project_raw, + "location_raw": c.location_raw, + "client_raw": c.client_raw, + "operator_raw": c.operator_raw, + "is_blank_meta": c.is_blank_meta, + "metadata_consistency": c.metadata_consistency, + + "project_match": s.project_match, + "project_existing_id": s.project_existing_id, + "project_existing_name": s.project_existing_name, + "project_match_score": s.project_match_score, + "project_suggested_name": s.project_suggested_name, + + "location_match": s.location_match, + "location_existing_id": s.location_existing_id, + "location_existing_name": s.location_existing_name, + "location_match_score": s.location_match_score, + "location_suggested_name": s.location_suggested_name, + + "proposed_assigned_at": s.proposed_assigned_at.isoformat(), + "proposed_assigned_until": s.proposed_assigned_until.isoformat() if s.proposed_assigned_until else None, + + "confidence": s.confidence, + "blocking_conflict": s.blocking_conflict, + "conflicts": [ + { + "existing_assignment_id": cf.existing_assignment_id, + "other_location_id": cf.other_location_id, + "other_location_name": cf.other_location_name, + "other_project_id": cf.other_project_id, + "other_project_name": cf.other_project_name, + } + for cf in s.conflicts + ], + } + + +@router.get("/scan") +async def scan( + force: bool = False, + db: Session = Depends(get_db), +): + """Run a scan and return clusters + suggestions. + + Set force=true to bypass the 5-minute cache. + """ + now = time.time() + if not force and _SCAN_CACHE["result"] is not None \ + and (now - _SCAN_CACHE["at"]) < _SCAN_CACHE_TTL_SECONDS: + return _SCAN_CACHE["result"] + + result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL) + + # Group suggestions for the wizard UI. + by_confidence = {"high": [], "medium": [], "low": []} + blocking_conflict_count = 0 + for s in result.suggestions: + by_confidence[s.confidence].append(_serialise_suggestion(s)) + if s.blocking_conflict: + blocking_conflict_count += 1 + + payload = { + "scanned_event_count": result.scanned_event_count, + "cluster_count": result.cluster_count, + "already_attributed": result.already_attributed, + "skipped_orphans": result.skipped_orphans, + "pending_count": len(result.suggestions), + "blocking_conflict_count": blocking_conflict_count, + "by_confidence": { + "high": by_confidence["high"], + "medium": by_confidence["medium"], + "low": by_confidence["low"], + }, + "scanned_at": now, + } + _SCAN_CACHE["result"] = payload + _SCAN_CACHE["at"] = now + return payload + + +@router.post("/apply") +async def apply( + request: Request, + db: Session = Depends(get_db), +): + """Apply a list of clusters. + + Body: + { + "cluster_ids": ["abc...", "def..."], + "overrides": { "abc...": { "project_name": "...", "location_name": "..." } } + } + + To accept ALL non-conflict suggestions in one shot, the UI sends every + pending cluster_id with no overrides. + """ + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body") + + cluster_ids = body.get("cluster_ids") or [] + overrides = body.get("overrides") or {} + if not isinstance(cluster_ids, list) or not cluster_ids: + raise HTTPException(status_code=400, detail="cluster_ids must be a non-empty list") + + # Re-scan to get current suggestions. We don't trust the cached scan + # blindly — the operator might have manually created projects in + # between scan and apply. + scan_result = await svc.scan_clusters_and_build_suggestions(db, SFM_BASE_URL) + suggestions_by_id = {s.cluster.cluster_id: s for s in scan_result.suggestions} + + selected: list[svc.Suggestion] = [] + not_found: list[str] = [] + for cid in cluster_ids: + s = suggestions_by_id.get(cid) + if s is None: + not_found.append(cid) + continue + # Apply overrides. + ov = overrides.get(cid) or {} + if "project_name" in ov: + s.project_suggested_name = (ov["project_name"] or "").strip() or s.project_suggested_name + # Override implies operator wants to create new (or rename). + # If they wanted an exact match, they'd not have overridden. + if s.project_match in ("create_new",): + pass # keep create_new + else: + # Operator typed a custom name — force create-new behaviour + # so we don't accidentally attach to a different existing + # project by exact-match. + s.project_existing_id = None + s.project_match = "create_new" + if "location_name" in ov: + s.location_suggested_name = (ov["location_name"] or "").strip() or s.location_suggested_name + if s.location_match in ("create_new",): + pass + else: + s.location_existing_id = None + s.location_match = "create_new" + selected.append(s) + + apply_result = svc.apply_suggestions(db, selected, decided_by="operator") + + # Invalidate the scan cache so the next /scan picks up the new state. + _SCAN_CACHE["at"] = 0.0 + _SCAN_CACHE["result"] = None + + return { + "applied": apply_result.applied, + "failed": [{"cluster_id": cid, "reason": r} for cid, r in apply_result.failed], + "not_found": not_found, + "project_ids_created": apply_result.project_ids_created, + "location_ids_created": apply_result.location_ids_created, + "assignment_ids_created": apply_result.assignment_ids_created, + } + + +@router.post("/skip") +async def skip( + request: Request, + db: Session = Depends(get_db), +): + """Mark cluster_ids as skipped — they won't reappear in future scans.""" + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body") + + cluster_ids = body.get("cluster_ids") or [] + if not isinstance(cluster_ids, list): + raise HTTPException(status_code=400, detail="cluster_ids must be a list") + + n = svc.skip_clusters(db, cluster_ids, decided_by="operator") + + _SCAN_CACHE["at"] = 0.0 + _SCAN_CACHE["result"] = None + + return {"skipped": n} diff --git a/backend/services/metadata_backfill.py b/backend/services/metadata_backfill.py new file mode 100644 index 0000000..5a86476 --- /dev/null +++ b/backend/services/metadata_backfill.py @@ -0,0 +1,1038 @@ +""" +metadata_backfill.py — turn operator-typed BW event metadata into the +terra-view Project / MonitoringLocation / UnitAssignment graph. + +Architecture (see /home/serversdown/.claude/plans/sfm-metadata-backfill-parser.md): + + 1. Pre-filter: drop events that already fall inside an existing + UnitAssignment window (Phase 2 attribution already handles them). + 2. Time-cluster: serial + 7-day gap is the cluster identity. + 3. Metadata-split: split on persistent (>= 2 events) metadata transitions. + 4. Match against existing graph (rapidfuzz multi-signal scoring). + 5. Score confidence (high/medium/low). + 6. Detect conflicts (overlap with existing UnitAssignment at different + location for the same serial → blocking). + 7. Apply: create Project / MonitoringLocation / UnitAssignment + + UnitHistory audit row, all in one transaction. + +Public API: + scan_clusters_and_build_suggestions(db, sfm_base_url) → ScanResult + apply_suggestions(db, cluster_ids, *, decided_by) → ApplyResult + skip_clusters(db, cluster_ids, *, decided_by) → int +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import os +import re +import uuid +from collections import Counter +from dataclasses import dataclass, field +from datetime import datetime, date, timedelta +from typing import Optional, Iterable, Literal + +import httpx +import rapidfuzz +from sqlalchemy.orm import Session + +from backend.models import ( + Project, + ProjectModule, + MonitoringLocation, + UnitAssignment, + RosterUnit, + UnitHistory, + MetadataBackfillDecision, +) + +log = logging.getLogger("backend.services.metadata_backfill") + + +# ── Tunables ────────────────────────────────────────────────────────────────── +CLUSTER_GAP_DAYS = 7 # time gap that splits a time-cluster +MIN_SPLIT_RUN_LENGTH = 2 # min consecutive events to trigger meta-split +FUZZY_EXACT_THRESHOLD = 0.95 # WRatio score → treated as exact match +FUZZY_MATCH_THRESHOLD = 0.80 # WRatio score → treated as fuzzy match +FUZZY_AMBIGUITY_DELTA = 0.05 # if 2nd-best score is within this of 1st → ambiguous +SUSPICIOUS_SPAN_DAYS = 90 # cluster spanning > this with sparse events → suspicious +RECENT_CLUSTER_DAYS = 3 # if cluster end is within this many days of now → leave assigned_until=NULL + +SFM_FETCH_CEILING = 5000 # max events per SFM /db/events call +SFM_TIMEOUT = 30.0 # generous; this runs in the background + +# ProjectType to assign to auto-created projects. We use a sentinel +# "auto_imported" type that the parser ensures exists. Operator can re-type +# them later by editing the Project. +AUTO_IMPORTED_TYPE_ID = "auto_imported" +AUTO_IMPORTED_TYPE_NAME = "Auto-imported (from event metadata)" + + +# ── Normalisation + fuzzy matching ───────────────────────────────────────────── + + +def _normalise(s: Optional[str]) -> str: + """Lowercase, replace internal punctuation with spaces, collapse spaces. + + Aggressive enough that 'Test-5-8-26' and 'Test 5/8/26' produce the + same normalised string ('test 5 8 26'), but preserves alphanumeric + content so 'I-80 N Fork' doesn't lose its content (becomes 'i 80 n fork'). + """ + if not s: + return "" + s = s.strip().lower() + # Replace any non-alphanumeric character with a space (preserves the + # tokens, just normalises the separators). + s = re.sub(r"[^a-z0-9]+", " ", s) + s = re.sub(r"\s+", " ", s).strip() + return s + + +# Min length of the SHORTER input before a fuzzy match is accepted. +# rapidfuzz.WRatio is generous with partial_ratio on short strings — e.g. +# 'demo' vs 'bridge demo project' scores 0.90 (false positive). Requiring +# the shorter input be >= 5 chars filters out those degenerate cases. +_MIN_FUZZY_LEN = 5 + + +def similarity(a: str, b: str) -> float: + """Multi-signal similarity score in [0.0, 1.0] via rapidfuzz.WRatio. + + Blends Levenshtein, partial-substring, token-sort, token-set + scoring. Handles abbreviations ('N' vs 'North'), word reordering, + and substring containment. + + Returns 0.0 if either input is empty OR if the shorter input is + too short to fuzzy-match safely (see _MIN_FUZZY_LEN comment) AND the + strings don't exact-match. This guardrails the 'one common word + inside a longer phrase' false positive. + """ + if not a or not b: + return 0.0 + if a == b: + return 1.0 + if min(len(a), len(b)) < _MIN_FUZZY_LEN: + return 0.0 + return rapidfuzz.fuzz.WRatio(a, b) / 100.0 + + +# ── Cluster + Suggestion dataclasses ─────────────────────────────────────────── + + +@dataclass +class Cluster: + cluster_id: str + serial: str + first_event_ts: datetime + last_event_ts: datetime + event_count: int + sample_event_id: str + + # Display values — the mode (most common) of each field. + project_raw: str + project_norm: str + location_raw: str + location_norm: str + client_raw: str + operator_raw: str + + is_blank_meta: bool + metadata_consistency: float # 0.0–1.0 + + +@dataclass +class ConflictHint: + existing_assignment_id: str + other_location_id: str + other_location_name: str + other_project_id: str + other_project_name: str + + +@dataclass +class Suggestion: + cluster: Cluster + project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"] + project_existing_id: Optional[str] + project_existing_name: Optional[str] + project_match_score: Optional[float] + project_suggested_name: str + + location_match: Literal["exact", "fuzzy", "create_new"] + location_existing_id: Optional[str] + location_existing_name: Optional[str] + location_match_score: Optional[float] + location_suggested_name: str + + proposed_assigned_at: datetime + proposed_assigned_until: Optional[datetime] # None = active / open-ended + + confidence: Literal["high", "medium", "low"] + conflicts: list[ConflictHint] = field(default_factory=list) + blocking_conflict: bool = False + already_attributed: bool = False + + +@dataclass +class ScanResult: + suggestions: list[Suggestion] + skipped_orphans: int # clusters skipped because pre-existing skip decision + already_attributed: int # clusters with full pre-existing assignment overlap + scanned_event_count: int + cluster_count: int + + +@dataclass +class ApplyResult: + applied: int + skipped: int + failed: list[tuple[str, str]] # (cluster_id, reason) + project_ids_created: list[str] + location_ids_created: list[str] + assignment_ids_created: list[str] + + +# ── Step 1+2+3: clustering ──────────────────────────────────────────────────── + + +def _build_cluster_id(serial: str, first_ts: datetime, last_ts: datetime) -> str: + """Deterministic SHA1 hash of (serial, first_date, last_date). + + Stable across re-scans of the same data — typo-corrected events + don't change the cluster_id. + """ + key = f"{serial}|{first_ts.date().isoformat()}|{last_ts.date().isoformat()}" + return hashlib.sha1(key.encode("utf-8")).hexdigest() + + +def _events_overlap_existing_assignment( + ev_ts: datetime, + serial: str, + assignments_by_serial: dict[str, list[UnitAssignment]], + now: datetime, +) -> bool: + """Does this event timestamp fall inside any existing UnitAssignment + window for this serial?""" + for a in assignments_by_serial.get(serial, []): + a_end = a.assigned_until or now + if a.assigned_at <= ev_ts <= a_end: + return True + return False + + +def _mode_string(values: Iterable[Optional[str]]) -> tuple[str, float]: + """Return (most_common_value, consistency_fraction). + + Treats None/empty as a single "blank" bucket. consistency_fraction + is fraction of inputs equal to the modal value. + """ + vals = [v if v else "" for v in values] + if not vals: + return "", 1.0 + counts = Counter(vals) + mode_value, mode_count = counts.most_common(1)[0] + return mode_value, mode_count / len(vals) + + +def _split_by_metadata_runs(events: list[dict]) -> list[list[dict]]: + """Run-length-encode (project_norm, location_norm) sequence; drop runs + shorter than MIN_SPLIT_RUN_LENGTH; return list of sub-cluster event lists. + + Single-event metadata blips (typos) are merged back into the surrounding + cluster's modal metadata. + """ + if not events: + return [] + + # Build (key, idx) pairs. Blanks count as the "previous" key for splitting + # purposes — i.e., a blank event doesn't fork the cluster. + def _key(ev: dict) -> tuple[str, str]: + return (_normalise(ev.get("project")), _normalise(ev.get("sensor_location"))) + + # Identify runs. + runs: list[list[int]] = [] # each run is a list of event-indices + last_key: Optional[tuple[str, str]] = None + current_run: list[int] = [] + blank = ("", "") + for i, ev in enumerate(events): + k = _key(ev) + if k == blank: + # Blank events inherit the previous run (or start a blank run if + # they're at the beginning). + current_run.append(i) + continue + if last_key is None or k == last_key: + current_run.append(i) + last_key = k + else: + runs.append(current_run) + current_run = [i] + last_key = k + if current_run: + runs.append(current_run) + + # Filter out short runs (typos / one-off blips). Their events are folded + # back into the run that follows or precedes them, whichever is longer. + def _run_key(run: list[int]) -> tuple[str, str]: + for idx in run: + k = _key(events[idx]) + if k != blank: + return k + return blank + + filtered: list[list[int]] = [] + pending_short: list[int] = [] + for run in runs: + if len(run) >= MIN_SPLIT_RUN_LENGTH: + # Fold any pending-short events into this run's front. + run = pending_short + run + pending_short = [] + filtered.append(run) + else: + pending_short.extend(run) + + # Any trailing pending-short get appended to the previous filtered run, + # or become their own run if nothing came before. + if pending_short: + if filtered: + filtered[-1].extend(pending_short) + else: + filtered.append(pending_short) + + return [[events[i] for i in run] for run in filtered] + + +def _events_to_clusters( + serial: str, + events: list[dict], # already filtered: assignments_by_serial overlap dropped +) -> list[Cluster]: + """Sort by timestamp, time-cluster on CLUSTER_GAP_DAYS, then metadata-split. + + Each output cluster has its modal metadata computed across its events. + """ + if not events: + return [] + + # Sort by timestamp ascending. (Caller has already dropped events with + # blank timestamps; guard anyway.) + events = [e for e in events if e.get("timestamp")] + events = sorted(events, key=lambda e: e["timestamp"]) + if not events: + return [] + + # Time-cluster. + gap = timedelta(days=CLUSTER_GAP_DAYS) + time_clusters: list[list[dict]] = [] + current: list[dict] = [] + last_ts: Optional[datetime] = None + for ev in events: + ts = _parse_ts(ev["timestamp"]) + if last_ts is None or (ts - last_ts) <= gap: + current.append(ev) + else: + time_clusters.append(current) + current = [ev] + last_ts = ts + if current: + time_clusters.append(current) + + # Metadata-split each time-cluster. + out: list[Cluster] = [] + for tc in time_clusters: + sub_clusters = _split_by_metadata_runs(tc) + for sc in sub_clusters: + cluster = _build_cluster(serial, sc) + out.append(cluster) + return out + + +def _parse_ts(ts: str) -> datetime: + """Parse SFM-returned timestamp string into datetime.""" + # SFM returns ISO 8601 with 'T' separator, sometimes with offset. + s = ts.replace("Z", "+00:00") + try: + dt = datetime.fromisoformat(s) + except ValueError: + # Fallback: space separator + dt = datetime.fromisoformat(s.replace(" ", "T")) + # Strip tzinfo for consistency with terra-view's naive UTC datetimes. + if dt.tzinfo is not None: + dt = dt.replace(tzinfo=None) + return dt + + +def _build_cluster(serial: str, events: list[dict]) -> Cluster: + """Compute a Cluster from a list of events that already belong together.""" + timestamps = [_parse_ts(ev["timestamp"]) for ev in events] + first_ts = min(timestamps) + last_ts = max(timestamps) + + project_mode_norm, project_consistency = _mode_string(_normalise(ev.get("project")) for ev in events) + location_mode_norm, location_consistency = _mode_string(_normalise(ev.get("sensor_location")) for ev in events) + + # For display, pick the most common RAW value (case-preserved) that + # normalises to the modal normalised value. + def _pick_display(field: str, mode_norm: str) -> str: + for ev in events: + v = (ev.get(field) or "").strip() + if _normalise(v) == mode_norm and v: + return v + return "" + + project_raw = _pick_display("project", project_mode_norm) + location_raw = _pick_display("sensor_location", location_mode_norm) + client_raw = _pick_display("client", _normalise(events[0].get("client"))) + operator_raw = _pick_display("operator", _normalise(events[0].get("operator"))) + + consistency = min(project_consistency, location_consistency) + is_blank = (not project_mode_norm) or (not location_mode_norm) + + return Cluster( + cluster_id = _build_cluster_id(serial, first_ts, last_ts), + serial = serial, + first_event_ts = first_ts, + last_event_ts = last_ts, + event_count = len(events), + sample_event_id = events[0]["id"], + project_raw = project_raw, + project_norm = project_mode_norm, + location_raw = location_raw, + location_norm = location_mode_norm, + client_raw = client_raw, + operator_raw = operator_raw, + is_blank_meta = is_blank, + metadata_consistency = consistency, + ) + + +# ── Step 4: SFM fetch ───────────────────────────────────────────────────────── + + +async def _fetch_all_events_from_sfm(sfm_base_url: str) -> list[dict]: + """Pull every event from SFM. Currently the only filter we can use is + serial-based, so we have to iterate over known serials. Practically this + means: get the list of units from /db/units, then fetch /db/events per + serial. ~26 calls for the dev box, manageable for prod (~50-100 serials). + """ + async with httpx.AsyncClient(timeout=SFM_TIMEOUT) as client: + units_resp = await client.get(f"{sfm_base_url}/db/units") + units_resp.raise_for_status() + units = units_resp.json() + serials = [u["serial"] for u in units if u.get("serial")] + + async def _fetch_one(serial: str) -> list[dict]: + try: + r = await client.get( + f"{sfm_base_url}/db/events", + params={"serial": serial, "limit": SFM_FETCH_CEILING}, + ) + r.raise_for_status() + payload = r.json() + # Strip waveform_blob (large bytes; we don't need them) + evs = payload.get("events", []) or [] + for ev in evs: + ev.pop("waveform_blob", None) + ev.pop("a5_pickle_filename", None) + return evs + except httpx.HTTPError as e: + log.warning("SFM fetch failed for serial=%s: %s", serial, e) + return [] + + all_events_nested = await asyncio.gather(*[_fetch_one(s) for s in serials]) + + all_events: list[dict] = [] + for batch in all_events_nested: + all_events.extend(batch) + return all_events + + +# ── Step 4 cont.: cluster building from SFM ─────────────────────────────────── + + +async def _scan_clusters( + db: Session, + sfm_base_url: str, +) -> tuple[list[Cluster], int, int]: + """Pull SFM events, pre-filter, time-cluster, metadata-split. + + Returns (clusters, scanned_event_count, already_attributed_event_count). + """ + events = await _fetch_all_events_from_sfm(sfm_base_url) + scanned = len(events) + + # Load all existing UnitAssignments once, group by serial. + all_assignments = db.query(UnitAssignment).all() + assignments_by_serial: dict[str, list[UnitAssignment]] = {} + for a in all_assignments: + assignments_by_serial.setdefault(a.unit_id, []).append(a) + + now = datetime.utcnow() + + # Bucket events by serial. + by_serial: dict[str, list[dict]] = {} + already_attributed = 0 + for ev in events: + serial = ev.get("serial") + if not serial: + continue + ts_raw = ev.get("timestamp") + if not ts_raw: + # Skip events without a timestamp — they can't be clustered. + continue + ts = _parse_ts(ts_raw) + if _events_overlap_existing_assignment(ts, serial, assignments_by_serial, now): + already_attributed += 1 + continue + by_serial.setdefault(serial, []).append(ev) + + # Cluster per serial. + clusters: list[Cluster] = [] + for serial, serial_events in by_serial.items(): + clusters.extend(_events_to_clusters(serial, serial_events)) + + return clusters, scanned, already_attributed + + +# ── Step 5: matching against the existing graph ─────────────────────────────── + + +def _find_best_match( + candidate_norm: str, + candidates: list[tuple[str, str]], # (id, normalised_name) +) -> tuple[Optional[str], Optional[float], str]: + """Return (best_id, best_score, classification). + + classification ∈ {"exact", "fuzzy", "ambiguous", "no_match"} + """ + if not candidate_norm or not candidates: + return None, None, "no_match" + + scored = [(cid, similarity(candidate_norm, cnorm)) for cid, cnorm in candidates] + scored.sort(key=lambda x: x[1], reverse=True) + best_id, best_score = scored[0] + + # Check for ambiguity: 2nd-best within FUZZY_AMBIGUITY_DELTA of best, + # and both above the match threshold. + if len(scored) > 1: + _, second_score = scored[1] + if best_score >= FUZZY_MATCH_THRESHOLD and second_score >= FUZZY_MATCH_THRESHOLD \ + and (best_score - second_score) < FUZZY_AMBIGUITY_DELTA: + return best_id, best_score, "ambiguous" + + if best_score >= FUZZY_EXACT_THRESHOLD: + return best_id, best_score, "exact" + if best_score >= FUZZY_MATCH_THRESHOLD: + return best_id, best_score, "fuzzy" + return None, best_score, "no_match" + + +def _detect_conflicts( + db: Session, + cluster: Cluster, + target_location_id: Optional[str], + proposed_assigned_at: datetime, + proposed_assigned_until: Optional[datetime], +) -> tuple[list[ConflictHint], bool, bool]: + """Return (conflicts, blocking, already_attributed_at_target). + + already_attributed_at_target = True means an existing UnitAssignment for + THIS serial is already at THIS target location during this window — + nothing to do, skip the cluster. + """ + now = datetime.utcnow() + end = proposed_assigned_until or now + + overlapping = ( + db.query(UnitAssignment) + .filter(UnitAssignment.unit_id == cluster.serial) + .all() + ) + conflicts: list[ConflictHint] = [] + already_at_target = False + blocking = False + + for a in overlapping: + a_end = a.assigned_until or now + if proposed_assigned_at < a_end and end > a.assigned_at: + # Overlapping window. + if target_location_id is not None and a.location_id == target_location_id: + already_at_target = True + continue + # Different location → blocking conflict. + loc = db.query(MonitoringLocation).filter_by(id=a.location_id).first() + proj = db.query(Project).filter_by(id=a.project_id).first() + conflicts.append(ConflictHint( + existing_assignment_id = a.id, + other_location_id = a.location_id, + other_location_name = loc.name if loc else "?", + other_project_id = a.project_id, + other_project_name = proj.name if proj else "?", + )) + blocking = True + + return conflicts, blocking, already_at_target + + +def _score_confidence( + cluster: Cluster, + project_match: str, + location_match: str, + conflicts: list[ConflictHint], +) -> str: + """Return 'high' | 'medium' | 'low'. See plan for full rules.""" + if conflicts: + return "low" + if cluster.is_blank_meta: + return "low" + if cluster.event_count < 2: + return "low" + + span_days = (cluster.last_event_ts - cluster.first_event_ts).days + if span_days > SUSPICIOUS_SPAN_DAYS and cluster.event_count < (span_days / 7): + # Cluster spans > 90 days but emits less than 1 event/week — sparse. + return "low" + + if project_match == "ambiguous" or location_match == "ambiguous": + return "low" + + if project_match == "fuzzy" or location_match == "fuzzy": + return "medium" + + # Exact match on both, OR clean create_new on both with good event count + # and non-blank metadata. + if (project_match == "exact" and location_match == "exact"): + return "high" + if (project_match == "create_new" and location_match == "create_new"): + return "high" + # Mixed: one exact, one create_new → medium (probably means location + # is new under an existing project, which is common but worth review) + return "medium" + + +def _build_suggestion(db: Session, cluster: Cluster) -> Suggestion: + """Match cluster against the existing graph, score, detect conflicts.""" + + # Match project. + existing_projects = db.query(Project).filter(Project.status != "deleted").all() + project_candidates = [(p.id, _normalise(p.name)) for p in existing_projects] + if cluster.project_norm: + proj_id, proj_score, proj_match = _find_best_match(cluster.project_norm, project_candidates) + else: + proj_id, proj_score, proj_match = None, None, "create_new" + + project_existing = None + project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"] + if proj_match == "exact": + project_match = "exact" + project_existing = next((p for p in existing_projects if p.id == proj_id), None) + elif proj_match == "fuzzy": + project_match = "fuzzy" + project_existing = next((p for p in existing_projects if p.id == proj_id), None) + elif proj_match == "ambiguous": + project_match = "ambiguous" + project_existing = next((p for p in existing_projects if p.id == proj_id), None) + else: + project_match = "create_new" + + project_suggested_name = ( + project_existing.name if project_existing and project_match == "exact" + else cluster.project_raw or f"Project {cluster.serial}" + ) + + # Match location ONLY within the matched project's existing locations. + # If we're creating a new project, every location is also new (location + # names should not be fuzzy-matched across project boundaries — "Loc 1" + # at Project A is a different thing than "Loc 1" at Project B). + if project_existing and project_match in ("exact", "fuzzy"): + location_candidates_objs = ( + db.query(MonitoringLocation) + .filter(MonitoringLocation.project_id == project_existing.id) + .filter(MonitoringLocation.location_type == "vibration") + .all() + ) + location_candidates = [(l.id, _normalise(l.name)) for l in location_candidates_objs] + if cluster.location_norm: + loc_id, loc_score, loc_match = _find_best_match(cluster.location_norm, location_candidates) + else: + loc_id, loc_score, loc_match = None, None, "create_new" + else: + # Project will be created new → location is automatically new. + location_candidates_objs = [] + loc_id, loc_score, loc_match = None, None, "create_new" + + location_existing = None + location_match: Literal["exact", "fuzzy", "create_new"] + if loc_match == "exact": + location_match = "exact" + location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) + elif loc_match == "fuzzy": + location_match = "fuzzy" + location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) + elif loc_match == "ambiguous": + # We treat ambiguous-location as fuzzy for now; the operator can + # pick which one (out of scope: a richer disambiguation UI). + location_match = "fuzzy" + location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) + else: + location_match = "create_new" + + location_suggested_name = ( + location_existing.name if location_existing and location_match == "exact" + else cluster.location_raw or "Unnamed location" + ) + + # Proposed assignment window. + proposed_at = cluster.first_event_ts - timedelta(hours=1) + now = datetime.utcnow() + if (now - cluster.last_event_ts) <= timedelta(days=RECENT_CLUSTER_DAYS): + proposed_until = None # Treat as active + else: + proposed_until = cluster.last_event_ts + timedelta(hours=1) + + # Conflict detection vs existing UnitAssignments. + target_loc_id = location_existing.id if location_existing else None + conflicts, blocking, already_attributed = _detect_conflicts( + db, cluster, target_loc_id, proposed_at, proposed_until, + ) + + confidence = _score_confidence(cluster, project_match, location_match, conflicts) + + return Suggestion( + cluster = cluster, + project_match = project_match, + project_existing_id = project_existing.id if project_existing else None, + project_existing_name = project_existing.name if project_existing else None, + project_match_score = proj_score, + project_suggested_name = project_suggested_name, + location_match = location_match, + location_existing_id = location_existing.id if location_existing else None, + location_existing_name = location_existing.name if location_existing else None, + location_match_score = loc_score, + location_suggested_name = location_suggested_name, + proposed_assigned_at = proposed_at, + proposed_assigned_until = proposed_until, + confidence = confidence, + conflicts = conflicts, + blocking_conflict = blocking, + already_attributed = already_attributed, + ) + + +# ── Public API: scan ────────────────────────────────────────────────────────── + + +async def scan_clusters_and_build_suggestions( + db: Session, + sfm_base_url: str, +) -> ScanResult: + """End-to-end scan: fetch SFM events, cluster, build suggestions. + + Persists / updates a MetadataBackfillDecision row per non-already-attributed + cluster with status='pending' (first time) or refreshed last_seen_at. + """ + clusters, scanned, already_attributed_events = await _scan_clusters(db, sfm_base_url) + + suggestions: list[Suggestion] = [] + skipped_orphans = 0 + already_attributed_clusters = 0 + now = datetime.utcnow() + + for cluster in clusters: + # Check existing decision. + decision = db.query(MetadataBackfillDecision).filter_by( + cluster_id=cluster.cluster_id + ).first() + + if decision and decision.status == "skipped": + skipped_orphans += 1 + # Refresh last_seen so we know it's still around. + decision.last_seen_at = now + continue + if decision and decision.status == "applied": + # Already done. Phase 2 attribution should be picking up these + # events now. + decision.last_seen_at = now + continue + + suggestion = _build_suggestion(db, cluster) + + if suggestion.already_attributed: + already_attributed_clusters += 1 + continue + + suggestions.append(suggestion) + + # Upsert decision row. + if decision is None: + db.add(MetadataBackfillDecision( + cluster_id = cluster.cluster_id, + status = "conflict" if suggestion.blocking_conflict else "pending", + confidence = suggestion.confidence, + first_seen_at = now, + last_seen_at = now, + serial = cluster.serial, + project_raw = cluster.project_raw or None, + location_raw = cluster.location_raw or None, + first_event_ts = cluster.first_event_ts, + last_event_ts = cluster.last_event_ts, + event_count = cluster.event_count, + )) + else: + decision.status = "conflict" if suggestion.blocking_conflict else "pending" + decision.confidence = suggestion.confidence + decision.last_seen_at = now + decision.event_count = cluster.event_count + decision.last_event_ts = cluster.last_event_ts + + db.commit() + + return ScanResult( + suggestions = suggestions, + skipped_orphans = skipped_orphans, + already_attributed = already_attributed_clusters, + scanned_event_count = scanned, + cluster_count = len(clusters), + ) + + +# ── Public API: apply ───────────────────────────────────────────────────────── + + +def _ensure_auto_imported_project_type(db: Session) -> str: + """Create the 'auto_imported' ProjectType if it doesn't exist. Returns id.""" + from backend.models import ProjectType + pt = db.query(ProjectType).filter_by(id=AUTO_IMPORTED_TYPE_ID).first() + if pt is None: + pt = ProjectType( + id = AUTO_IMPORTED_TYPE_ID, + name = AUTO_IMPORTED_TYPE_NAME, + description = "Projects created automatically by the metadata-backfill parser. Operators can re-type them later.", + supports_vibration = True, + supports_sound = False, + ) + db.add(pt) + db.flush() + return pt.id + + +def _ensure_project(db: Session, suggestion: Suggestion) -> tuple[Project, bool]: + """Return (project, created_flag).""" + if suggestion.project_existing_id: + p = db.query(Project).filter_by(id=suggestion.project_existing_id).first() + if p is not None: + return p, False + + # Need to create. But: Project.name has UNIQUE constraint. Check for + # a case-insensitive existing project before creating. + candidate_name = suggestion.project_suggested_name.strip() or f"Auto-imported project ({suggestion.cluster.serial})" + existing = db.query(Project).filter(Project.name.ilike(candidate_name)).first() + if existing is not None: + return existing, False + + type_id = _ensure_auto_imported_project_type(db) + + # Derive dates. + start = suggestion.cluster.first_event_ts.date() + end = (suggestion.cluster.last_event_ts.date() + if suggestion.proposed_assigned_until else None) + + p = Project( + id = str(uuid.uuid4()), + name = candidate_name, + description = ( + f"Auto-created by the metadata-backfill parser on " + f"{datetime.utcnow():%Y-%m-%d}. Sourced from operator-typed " + f"BW metadata across {suggestion.cluster.event_count} event(s) " + f"from serial {suggestion.cluster.serial}." + ), + project_type_id = type_id, + status = "active", + data_collection_mode = "remote", + client_name = suggestion.cluster.client_raw or None, + start_date = start, + end_date = end, + ) + db.add(p) + db.flush() + + # Ensure the vibration_monitoring module is enabled. + pm = ProjectModule( + id = str(uuid.uuid4()), + project_id = p.id, + module_type = "vibration_monitoring", + enabled = True, + ) + db.add(pm) + db.flush() + + return p, True + + +def _ensure_location( + db: Session, + project: Project, + suggestion: Suggestion, +) -> tuple[MonitoringLocation, bool]: + """Return (location, created_flag).""" + if suggestion.location_existing_id: + l = db.query(MonitoringLocation).filter_by(id=suggestion.location_existing_id).first() + if l is not None and l.project_id == project.id: + return l, False + + candidate_name = suggestion.location_suggested_name.strip() or "Unnamed location" + # Check for case-insensitive existing within this project. + existing = ( + db.query(MonitoringLocation) + .filter(MonitoringLocation.project_id == project.id) + .filter(MonitoringLocation.name.ilike(candidate_name)) + .first() + ) + if existing is not None: + return existing, False + + l = MonitoringLocation( + id = str(uuid.uuid4()), + project_id = project.id, + location_type = "vibration", + name = candidate_name, + description = ( + f"Auto-created by metadata-backfill from operator-typed sensor_location " + f"\"{suggestion.cluster.location_raw}\" on events from serial " + f"{suggestion.cluster.serial}." + ), + ) + db.add(l) + db.flush() + return l, True + + +def _apply_one( + db: Session, + suggestion: Suggestion, + *, + decided_by: str, +) -> tuple[str, str, str]: + """Apply a single suggestion in a transaction. + + Returns (project_id, location_id, assignment_id). + """ + project, _proj_created = _ensure_project(db, suggestion) + location, _loc_created = _ensure_location(db, project, suggestion) + + # Create the UnitAssignment. + assignment_id = str(uuid.uuid4()) + assignment = UnitAssignment( + id = assignment_id, + unit_id = suggestion.cluster.serial, + location_id = location.id, + project_id = project.id, + device_type = "seismograph", + assigned_at = suggestion.proposed_assigned_at, + assigned_until = suggestion.proposed_assigned_until, + status = "active" if suggestion.proposed_assigned_until is None else "completed", + source = "metadata_backfill", + notes = ( + f"Auto-created from operator-typed metadata on " + f"{suggestion.cluster.event_count} event(s). " + f"Project: \"{suggestion.cluster.project_raw}\". " + f"Location: \"{suggestion.cluster.location_raw}\". " + f"Confidence: {suggestion.confidence}." + ), + ) + db.add(assignment) + + # Audit log entry. + db.add(UnitHistory( + unit_id = suggestion.cluster.serial, + change_type = "assignment_backfilled", + field_name = "unit_assignment", + old_value = None, + new_value = f"{project.name} / {location.name}", + changed_at = datetime.utcnow(), + source = "metadata_backfill", + notes = ( + f"Created from {suggestion.cluster.event_count} events tagged " + f"({suggestion.cluster.project_raw!r}, {suggestion.cluster.location_raw!r}). " + f"By: {decided_by}." + ), + )) + + # Update the decision record. + decision = db.query(MetadataBackfillDecision).filter_by( + cluster_id=suggestion.cluster.cluster_id + ).first() + if decision: + decision.status = "applied" + decision.decided_at = datetime.utcnow() + decision.decided_by = decided_by + decision.applied_assignment_id = assignment_id + + return project.id, location.id, assignment_id + + +def apply_suggestions( + db: Session, + suggestions: list[Suggestion], + *, + decided_by: str, +) -> ApplyResult: + """Apply a list of suggestions. Each suggestion is applied in its own + sub-transaction (via db.flush) — failures don't roll back successful ones. + """ + applied = 0 + failed: list[tuple[str, str]] = [] + proj_ids: list[str] = [] + loc_ids: list[str] = [] + asgn_ids: list[str] = [] + + for s in suggestions: + if s.blocking_conflict: + failed.append((s.cluster.cluster_id, "blocking conflict — needs manual resolution")) + continue + try: + p_id, l_id, a_id = _apply_one(db, s, decided_by=decided_by) + db.flush() # ensure FK consistency within transaction + proj_ids.append(p_id) + loc_ids.append(l_id) + asgn_ids.append(a_id) + applied += 1 + except Exception as e: + log.exception("Failed to apply cluster %s", s.cluster.cluster_id) + db.rollback() + failed.append((s.cluster.cluster_id, str(e))) + + db.commit() + + return ApplyResult( + applied = applied, + skipped = 0, + failed = failed, + project_ids_created = list(dict.fromkeys(proj_ids)), + location_ids_created = list(dict.fromkeys(loc_ids)), + assignment_ids_created = asgn_ids, + ) + + +def skip_clusters( + db: Session, + cluster_ids: list[str], + *, + decided_by: str = "operator", +) -> int: + """Mark clusters as skipped so they don't reappear in future scans.""" + now = datetime.utcnow() + n = 0 + for cluster_id in cluster_ids: + decision = db.query(MetadataBackfillDecision).filter_by(cluster_id=cluster_id).first() + if decision is None: + continue + if decision.status not in ("pending", "conflict"): + continue + decision.status = "skipped" + decision.decided_at = now + decision.decided_by = decided_by + n += 1 + db.commit() + return n diff --git a/requirements.txt b/requirements.txt index 542f015..f5bd95d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ aiofiles==23.2.1 Pillow==10.1.0 httpx==0.25.2 openpyxl==3.1.2 +rapidfuzz==3.10.1 diff --git a/templates/admin/metadata_backfill.html b/templates/admin/metadata_backfill.html new file mode 100644 index 0000000..adb0177 --- /dev/null +++ b/templates/admin/metadata_backfill.html @@ -0,0 +1,405 @@ +{% extends "base.html" %} + +{% block title %}Metadata Backfill - Seismo Fleet Manager{% endblock %} + +{% block content %} + +
+ +
+ + +
+

Backfill from event metadata

+

+ Auto-create projects, locations, and unit assignments from operator-typed metadata on Blastware events. +

+
+ + +
+
+
+ + + +

Scan SFM events

+

+ Reads all events from SFM, clusters them by serial & time, matches the + operator-typed metadata against your existing projects, and proposes + Project / Location / UnitAssignment + chains to create. +

+ +
+
+ + +
+ + +
+ + + + + +{% include 'partials/event_detail_modal.html' %} + + + +{% endblock %} diff --git a/templates/settings.html b/templates/settings.html index bd2d603..b3c2cb3 100644 --- a/templates/settings.html +++ b/templates/settings.html @@ -560,6 +560,20 @@ Open + + +
+
+
Backfill from event metadata
+
+ Auto-create projects, locations, and unit assignments from the operator-typed metadata baked into SFM events. Skip the manual entry. +
+
+ + Open + +