""" metadata_backfill.py — turn operator-typed BW event metadata into the terra-view Project / MonitoringLocation / UnitAssignment graph. Architecture (see /home/serversdown/.claude/plans/sfm-metadata-backfill-parser.md): 1. Pre-filter: drop events that already fall inside an existing UnitAssignment window (Phase 2 attribution already handles them). 2. Time-cluster: serial + 7-day gap is the cluster identity. 3. Metadata-split: split on persistent (>= 2 events) metadata transitions. 4. Match against existing graph (rapidfuzz multi-signal scoring). 5. Score confidence (high/medium/low). 6. Detect conflicts (overlap with existing UnitAssignment at different location for the same serial → blocking). 7. Apply: create Project / MonitoringLocation / UnitAssignment + UnitHistory audit row, all in one transaction. Public API: scan_clusters_and_build_suggestions(db, sfm_base_url) → ScanResult apply_suggestions(db, cluster_ids, *, decided_by) → ApplyResult skip_clusters(db, cluster_ids, *, decided_by) → int """ from __future__ import annotations import asyncio import hashlib import logging import os import re import uuid from collections import Counter from dataclasses import dataclass, field from datetime import datetime, date, timedelta from typing import Optional, Iterable, Literal import httpx import rapidfuzz from sqlalchemy.orm import Session from backend.models import ( Project, ProjectModule, MonitoringLocation, UnitAssignment, RosterUnit, UnitHistory, MetadataBackfillDecision, ) log = logging.getLogger("backend.services.metadata_backfill") # ── Tunables ────────────────────────────────────────────────────────────────── CLUSTER_GAP_DAYS = 7 # time gap that splits a time-cluster MIN_SPLIT_RUN_LENGTH = 2 # min consecutive events to trigger meta-split FUZZY_EXACT_THRESHOLD = 0.95 # WRatio score → treated as exact match FUZZY_MATCH_THRESHOLD = 0.80 # WRatio score → treated as fuzzy match FUZZY_AMBIGUITY_DELTA = 0.05 # if 2nd-best score is within this of 1st → ambiguous SUSPICIOUS_SPAN_DAYS = 90 # cluster spanning > this with sparse events → suspicious RECENT_CLUSTER_DAYS = 3 # if cluster end is within this many days of now → leave assigned_until=NULL SFM_FETCH_CEILING = 5000 # max events per SFM /db/events call SFM_TIMEOUT = 30.0 # generous; this runs in the background # ProjectType to assign to auto-created projects. We use a sentinel # "auto_imported" type that the parser ensures exists. Operator can re-type # them later by editing the Project. AUTO_IMPORTED_TYPE_ID = "auto_imported" AUTO_IMPORTED_TYPE_NAME = "Auto-imported (from event metadata)" # ── Normalisation + fuzzy matching ───────────────────────────────────────────── def _normalise(s: Optional[str]) -> str: """Lowercase, replace internal punctuation with spaces, collapse spaces. Aggressive enough that 'Test-5-8-26' and 'Test 5/8/26' produce the same normalised string ('test 5 8 26'), but preserves alphanumeric content so 'I-80 N Fork' doesn't lose its content (becomes 'i 80 n fork'). """ if not s: return "" s = s.strip().lower() # Replace any non-alphanumeric character with a space (preserves the # tokens, just normalises the separators). s = re.sub(r"[^a-z0-9]+", " ", s) s = re.sub(r"\s+", " ", s).strip() return s # Match a "Loc N" / "Location #N" suffix preceded by a separator. Operators # often type project names like "Fay - Locks & Dam No3 - Loc 2 - 735 Bunola" # where the leading "Fay - Locks & Dam No3" is the actual project and the # trailing "- Loc 2 - ..." is location info that already lives in the # sensor_location field. We strip the trailing junk so projects with the # same root get clustered together. # # Matches: # "- Loc 2", "-Loc3", "- Location #5", " — Location.5", "- LOC #07" # Doesn't match strings without an obvious Loc N marker — those keep # their full project_raw and the operator can edit them in the wizard. _PROJECT_LOC_SUFFIX = re.compile( r""" \s* # any leading whitespace [-–—.] # separator: hyphen, em-dash, or period # (operators use any of these — see # "Mont.Dam.Loc 2-R-25") \s* (?:loc|location) # 'Loc' or 'Location' \.? # optional trailing period after Loc \s* (?:no\.?\s*)? # optional "No." or "No " before the digit # (e.g. "Loc No. 3", "Loc No 5") \#? # optional '#' \s* \d+ # required digit \b """, re.IGNORECASE | re.VERBOSE, ) def _extract_project_root(project_raw: str) -> str: """Return the leading 'project root' portion of an operator-typed string. Strips everything from the first " - Loc N" (or similar) marker forward, so 'Fay - Locks & Dam No3 - Loc 2 - 735 Bunola' becomes 'Fay - Locks & Dam No3'. Strings without a Loc-marker pass through unchanged. Trailing whitespace and dangling hyphens are cleaned up. """ if not project_raw: return "" m = _PROJECT_LOC_SUFFIX.search(project_raw) if m is None: return project_raw.strip() root = project_raw[: m.start()] # Strip trailing whitespace + dangling separators left behind # (e.g. "Fay - Locks & Dam No3 -" → "Fay - Locks & Dam No3"). root = re.sub(r"[\s\-–—]+$", "", root) return root.strip() # Min length of the SHORTER input before a fuzzy match is accepted. # rapidfuzz.WRatio is generous with partial_ratio on short strings — e.g. # 'demo' vs 'bridge demo project' scores 0.90 (false positive). Requiring # the shorter input be >= 5 chars filters out those degenerate cases. _MIN_FUZZY_LEN = 5 def similarity(a: str, b: str) -> float: """Multi-signal similarity score in [0.0, 1.0] via rapidfuzz.WRatio. Blends Levenshtein, partial-substring, token-sort, token-set scoring. Handles abbreviations ('N' vs 'North'), word reordering, and substring containment. Returns 0.0 if either input is empty OR if the shorter input is too short to fuzzy-match safely (see _MIN_FUZZY_LEN comment) AND the strings don't exact-match. This guardrails the 'one common word inside a longer phrase' false positive. """ if not a or not b: return 0.0 if a == b: return 1.0 if min(len(a), len(b)) < _MIN_FUZZY_LEN: return 0.0 return rapidfuzz.fuzz.WRatio(a, b) / 100.0 # ── Cluster + Suggestion dataclasses ─────────────────────────────────────────── @dataclass class Cluster: cluster_id: str serial: str first_event_ts: datetime last_event_ts: datetime event_count: int sample_event_id: str # project_raw is the FULL operator-typed string (e.g. # "Fay - Locks & Dam No3 - Loc 5 Synthomer"). Kept for display so # operator can sanity-check what they typed. project_raw: str # project_root is project_raw with any trailing "- Loc N" suffix # stripped — what we actually use for matching and as the suggested # project name. (e.g. "Fay - Locks & Dam No3"). Same as project_raw # if no Loc marker was found. project_root: str project_norm: str location_raw: str location_norm: str client_raw: str operator_raw: str is_blank_meta: bool metadata_consistency: float # 0.0–1.0 @dataclass class ConflictHint: existing_assignment_id: str other_location_id: str other_location_name: str other_project_id: str other_project_name: str @dataclass class Suggestion: cluster: Cluster project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"] project_existing_id: Optional[str] project_existing_name: Optional[str] project_match_score: Optional[float] project_suggested_name: str location_match: Literal["exact", "fuzzy", "create_new"] location_existing_id: Optional[str] location_existing_name: Optional[str] location_match_score: Optional[float] location_suggested_name: str proposed_assigned_at: datetime proposed_assigned_until: Optional[datetime] # None = active / open-ended confidence: Literal["high", "medium", "low"] conflicts: list[ConflictHint] = field(default_factory=list) blocking_conflict: bool = False already_attributed: bool = False @dataclass class ScanResult: suggestions: list[Suggestion] skipped_orphans: int # clusters skipped because pre-existing skip decision already_attributed: int # clusters with full pre-existing assignment overlap scanned_event_count: int cluster_count: int @dataclass class ApplyResult: applied: int skipped: int failed: list[tuple[str, str]] # (cluster_id, reason) project_ids_created: list[str] location_ids_created: list[str] assignment_ids_created: list[str] # ── Step 1+2+3: clustering ──────────────────────────────────────────────────── def _build_cluster_id(serial: str, first_ts: datetime, last_ts: datetime) -> str: """Deterministic SHA1 hash of (serial, first_date, last_date). Stable across re-scans of the same data — typo-corrected events don't change the cluster_id. """ key = f"{serial}|{first_ts.date().isoformat()}|{last_ts.date().isoformat()}" return hashlib.sha1(key.encode("utf-8")).hexdigest() def _events_overlap_existing_assignment( ev_ts: datetime, serial: str, assignments_by_serial: dict[str, list[UnitAssignment]], now: datetime, ) -> bool: """Does this event timestamp fall inside any existing UnitAssignment window for this serial?""" for a in assignments_by_serial.get(serial, []): a_end = a.assigned_until or now if a.assigned_at <= ev_ts <= a_end: return True return False def _mode_string(values: Iterable[Optional[str]]) -> tuple[str, float]: """Return (most_common_value, consistency_fraction). Treats None/empty as a single "blank" bucket. consistency_fraction is fraction of inputs equal to the modal value. """ vals = [v if v else "" for v in values] if not vals: return "", 1.0 counts = Counter(vals) mode_value, mode_count = counts.most_common(1)[0] return mode_value, mode_count / len(vals) def _split_by_metadata_runs(events: list[dict]) -> list[list[dict]]: """Run-length-encode (project_norm, location_norm) sequence; drop runs shorter than MIN_SPLIT_RUN_LENGTH; return list of sub-cluster event lists. Single-event metadata blips (typos) are merged back into the surrounding cluster's modal metadata. """ if not events: return [] # Build (key, idx) pairs. Blanks count as the "previous" key for splitting # purposes — i.e., a blank event doesn't fork the cluster. def _key(ev: dict) -> tuple[str, str]: return (_normalise(ev.get("project")), _normalise(ev.get("sensor_location"))) # Identify runs. runs: list[list[int]] = [] # each run is a list of event-indices last_key: Optional[tuple[str, str]] = None current_run: list[int] = [] blank = ("", "") for i, ev in enumerate(events): k = _key(ev) if k == blank: # Blank events inherit the previous run (or start a blank run if # they're at the beginning). current_run.append(i) continue if last_key is None or k == last_key: current_run.append(i) last_key = k else: runs.append(current_run) current_run = [i] last_key = k if current_run: runs.append(current_run) # Filter out short runs (typos / one-off blips). Their events are folded # back into the run that follows or precedes them, whichever is longer. def _run_key(run: list[int]) -> tuple[str, str]: for idx in run: k = _key(events[idx]) if k != blank: return k return blank filtered: list[list[int]] = [] pending_short: list[int] = [] for run in runs: if len(run) >= MIN_SPLIT_RUN_LENGTH: # Fold any pending-short events into this run's front. run = pending_short + run pending_short = [] filtered.append(run) else: pending_short.extend(run) # Any trailing pending-short get appended to the previous filtered run, # or become their own run if nothing came before. if pending_short: if filtered: filtered[-1].extend(pending_short) else: filtered.append(pending_short) return [[events[i] for i in run] for run in filtered] def _events_to_clusters( serial: str, events: list[dict], # already filtered: assignments_by_serial overlap dropped ) -> list[Cluster]: """Sort by timestamp, time-cluster on CLUSTER_GAP_DAYS, then metadata-split. Each output cluster has its modal metadata computed across its events. """ if not events: return [] # Sort by timestamp ascending. (Caller has already dropped events with # blank timestamps; guard anyway.) events = [e for e in events if e.get("timestamp")] events = sorted(events, key=lambda e: e["timestamp"]) if not events: return [] # Time-cluster. gap = timedelta(days=CLUSTER_GAP_DAYS) time_clusters: list[list[dict]] = [] current: list[dict] = [] last_ts: Optional[datetime] = None for ev in events: ts = _parse_ts(ev["timestamp"]) if last_ts is None or (ts - last_ts) <= gap: current.append(ev) else: time_clusters.append(current) current = [ev] last_ts = ts if current: time_clusters.append(current) # Metadata-split each time-cluster. out: list[Cluster] = [] for tc in time_clusters: sub_clusters = _split_by_metadata_runs(tc) for sc in sub_clusters: cluster = _build_cluster(serial, sc) out.append(cluster) return out def _parse_ts(ts: str) -> datetime: """Parse SFM-returned timestamp string into datetime.""" # SFM returns ISO 8601 with 'T' separator, sometimes with offset. s = ts.replace("Z", "+00:00") try: dt = datetime.fromisoformat(s) except ValueError: # Fallback: space separator dt = datetime.fromisoformat(s.replace(" ", "T")) # Strip tzinfo for consistency with terra-view's naive UTC datetimes. if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) return dt def _build_cluster(serial: str, events: list[dict]) -> Cluster: """Compute a Cluster from a list of events that already belong together.""" timestamps = [_parse_ts(ev["timestamp"]) for ev in events] first_ts = min(timestamps) last_ts = max(timestamps) project_mode_norm, project_consistency = _mode_string(_normalise(ev.get("project")) for ev in events) location_mode_norm, location_consistency = _mode_string(_normalise(ev.get("sensor_location")) for ev in events) # For display, pick the most common RAW value (case-preserved) that # normalises to the modal normalised value. def _pick_display(field: str, mode_norm: str) -> str: for ev in events: v = (ev.get(field) or "").strip() if _normalise(v) == mode_norm and v: return v return "" project_raw = _pick_display("project", project_mode_norm) location_raw = _pick_display("sensor_location", location_mode_norm) client_raw = _pick_display("client", _normalise(events[0].get("client"))) operator_raw = _pick_display("operator", _normalise(events[0].get("operator"))) # Strip trailing "- Loc N" location info that operators sometimes bake # into the project string for email-readability ("I-80 - Loc 2 - 543 W # Plant Rd" → "I-80"). The sensor_location field already has the # authoritative location identifier. Use project_root for matching # and as the suggested project name; keep project_raw for display. project_root = _extract_project_root(project_raw) project_norm_for_matching = _normalise(project_root) consistency = min(project_consistency, location_consistency) is_blank = (not project_norm_for_matching) or (not location_mode_norm) return Cluster( cluster_id = _build_cluster_id(serial, first_ts, last_ts), serial = serial, first_event_ts = first_ts, last_event_ts = last_ts, event_count = len(events), sample_event_id = events[0]["id"], project_raw = project_raw, project_root = project_root, project_norm = project_norm_for_matching, location_raw = location_raw, location_norm = location_mode_norm, client_raw = client_raw, operator_raw = operator_raw, is_blank_meta = is_blank, metadata_consistency = consistency, ) # ── Step 4: SFM fetch ───────────────────────────────────────────────────────── async def _fetch_all_events_from_sfm(sfm_base_url: str) -> list[dict]: """Pull every event from SFM. Currently the only filter we can use is serial-based, so we have to iterate over known serials. Practically this means: get the list of units from /db/units, then fetch /db/events per serial. ~26 calls for the dev box, manageable for prod (~50-100 serials). """ async with httpx.AsyncClient(timeout=SFM_TIMEOUT) as client: units_resp = await client.get(f"{sfm_base_url}/db/units") units_resp.raise_for_status() units = units_resp.json() serials = [u["serial"] for u in units if u.get("serial")] async def _fetch_one(serial: str) -> list[dict]: try: r = await client.get( f"{sfm_base_url}/db/events", params={"serial": serial, "limit": SFM_FETCH_CEILING}, ) r.raise_for_status() payload = r.json() # Strip waveform_blob (large bytes; we don't need them) evs = payload.get("events", []) or [] for ev in evs: ev.pop("waveform_blob", None) ev.pop("a5_pickle_filename", None) return evs except httpx.HTTPError as e: log.warning("SFM fetch failed for serial=%s: %s", serial, e) return [] all_events_nested = await asyncio.gather(*[_fetch_one(s) for s in serials]) all_events: list[dict] = [] for batch in all_events_nested: all_events.extend(batch) return all_events # ── Step 4 cont.: cluster building from SFM ─────────────────────────────────── async def _scan_clusters( db: Session, sfm_base_url: str, ) -> tuple[list[Cluster], int, int]: """Pull SFM events, pre-filter, time-cluster, metadata-split. Returns (clusters, scanned_event_count, already_attributed_event_count). """ events = await _fetch_all_events_from_sfm(sfm_base_url) scanned = len(events) # Load all existing UnitAssignments once, group by serial. all_assignments = db.query(UnitAssignment).all() assignments_by_serial: dict[str, list[UnitAssignment]] = {} for a in all_assignments: assignments_by_serial.setdefault(a.unit_id, []).append(a) now = datetime.utcnow() # Bucket events by serial. by_serial: dict[str, list[dict]] = {} already_attributed = 0 for ev in events: serial = ev.get("serial") if not serial: continue ts_raw = ev.get("timestamp") if not ts_raw: # Skip events without a timestamp — they can't be clustered. continue ts = _parse_ts(ts_raw) if _events_overlap_existing_assignment(ts, serial, assignments_by_serial, now): already_attributed += 1 continue by_serial.setdefault(serial, []).append(ev) # Cluster per serial. clusters: list[Cluster] = [] for serial, serial_events in by_serial.items(): clusters.extend(_events_to_clusters(serial, serial_events)) return clusters, scanned, already_attributed # ── Step 5: matching against the existing graph ─────────────────────────────── def _find_best_match( candidate_norm: str, candidates: list[tuple[str, str]], # (id, normalised_name) ) -> tuple[Optional[str], Optional[float], str]: """Return (best_id, best_score, classification). classification ∈ {"exact", "fuzzy", "ambiguous", "no_match"} """ if not candidate_norm or not candidates: return None, None, "no_match" scored = [(cid, similarity(candidate_norm, cnorm)) for cid, cnorm in candidates] scored.sort(key=lambda x: x[1], reverse=True) best_id, best_score = scored[0] # Check for ambiguity: 2nd-best within FUZZY_AMBIGUITY_DELTA of best, # and both above the match threshold. if len(scored) > 1: _, second_score = scored[1] if best_score >= FUZZY_MATCH_THRESHOLD and second_score >= FUZZY_MATCH_THRESHOLD \ and (best_score - second_score) < FUZZY_AMBIGUITY_DELTA: return best_id, best_score, "ambiguous" if best_score >= FUZZY_EXACT_THRESHOLD: return best_id, best_score, "exact" if best_score >= FUZZY_MATCH_THRESHOLD: return best_id, best_score, "fuzzy" return None, best_score, "no_match" def _detect_conflicts( db: Session, cluster: Cluster, target_location_id: Optional[str], proposed_assigned_at: datetime, proposed_assigned_until: Optional[datetime], ) -> tuple[list[ConflictHint], bool, bool]: """Return (conflicts, blocking, already_attributed_at_target). already_attributed_at_target = True means an existing UnitAssignment for THIS serial is already at THIS target location during this window — nothing to do, skip the cluster. """ now = datetime.utcnow() end = proposed_assigned_until or now overlapping = ( db.query(UnitAssignment) .filter(UnitAssignment.unit_id == cluster.serial) .all() ) conflicts: list[ConflictHint] = [] already_at_target = False blocking = False for a in overlapping: a_end = a.assigned_until or now if proposed_assigned_at < a_end and end > a.assigned_at: # Overlapping window. if target_location_id is not None and a.location_id == target_location_id: already_at_target = True continue # Different location → blocking conflict. loc = db.query(MonitoringLocation).filter_by(id=a.location_id).first() proj = db.query(Project).filter_by(id=a.project_id).first() conflicts.append(ConflictHint( existing_assignment_id = a.id, other_location_id = a.location_id, other_location_name = loc.name if loc else "?", other_project_id = a.project_id, other_project_name = proj.name if proj else "?", )) blocking = True return conflicts, blocking, already_at_target def _score_confidence( cluster: Cluster, project_match: str, location_match: str, conflicts: list[ConflictHint], ) -> str: """Return 'high' | 'medium' | 'low'. See plan for full rules.""" if conflicts: return "low" if cluster.is_blank_meta: return "low" if cluster.event_count < 2: return "low" span_days = (cluster.last_event_ts - cluster.first_event_ts).days if span_days > SUSPICIOUS_SPAN_DAYS and cluster.event_count < (span_days / 7): # Cluster spans > 90 days but emits less than 1 event/week — sparse. return "low" if project_match == "ambiguous" or location_match == "ambiguous": return "low" if project_match == "fuzzy" or location_match == "fuzzy": return "medium" # Exact match on both, OR clean create_new on both with good event count # and non-blank metadata. if (project_match == "exact" and location_match == "exact"): return "high" if (project_match == "create_new" and location_match == "create_new"): return "high" # Mixed: one exact, one create_new → medium (probably means location # is new under an existing project, which is common but worth review) return "medium" def _build_suggestion(db: Session, cluster: Cluster) -> Suggestion: """Match cluster against the existing graph, score, detect conflicts.""" # Match project. existing_projects = db.query(Project).filter(Project.status != "deleted").all() project_candidates = [(p.id, _normalise(p.name)) for p in existing_projects] if cluster.project_norm: proj_id, proj_score, proj_match = _find_best_match(cluster.project_norm, project_candidates) else: proj_id, proj_score, proj_match = None, None, "create_new" project_existing = None project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"] if proj_match == "exact": project_match = "exact" project_existing = next((p for p in existing_projects if p.id == proj_id), None) elif proj_match == "fuzzy": project_match = "fuzzy" project_existing = next((p for p in existing_projects if p.id == proj_id), None) elif proj_match == "ambiguous": project_match = "ambiguous" project_existing = next((p for p in existing_projects if p.id == proj_id), None) else: project_match = "create_new" project_suggested_name = ( project_existing.name if project_existing and project_match == "exact" else cluster.project_root or cluster.project_raw or f"Project {cluster.serial}" ) # Match location ONLY within the matched project's existing locations. # If we're creating a new project, every location is also new (location # names should not be fuzzy-matched across project boundaries — "Loc 1" # at Project A is a different thing than "Loc 1" at Project B). if project_existing and project_match in ("exact", "fuzzy"): location_candidates_objs = ( db.query(MonitoringLocation) .filter(MonitoringLocation.project_id == project_existing.id) .filter(MonitoringLocation.location_type == "vibration") .all() ) location_candidates = [(l.id, _normalise(l.name)) for l in location_candidates_objs] if cluster.location_norm: loc_id, loc_score, loc_match = _find_best_match(cluster.location_norm, location_candidates) else: loc_id, loc_score, loc_match = None, None, "create_new" else: # Project will be created new → location is automatically new. location_candidates_objs = [] loc_id, loc_score, loc_match = None, None, "create_new" location_existing = None location_match: Literal["exact", "fuzzy", "create_new"] if loc_match == "exact": location_match = "exact" location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) elif loc_match == "fuzzy": location_match = "fuzzy" location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) elif loc_match == "ambiguous": # We treat ambiguous-location as fuzzy for now; the operator can # pick which one (out of scope: a richer disambiguation UI). location_match = "fuzzy" location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None) else: location_match = "create_new" location_suggested_name = ( location_existing.name if location_existing and location_match == "exact" else cluster.location_raw or "Unnamed location" ) # Proposed assignment window. proposed_at = cluster.first_event_ts - timedelta(hours=1) now = datetime.utcnow() if (now - cluster.last_event_ts) <= timedelta(days=RECENT_CLUSTER_DAYS): proposed_until = None # Treat as active else: proposed_until = cluster.last_event_ts + timedelta(hours=1) # Conflict detection vs existing UnitAssignments. target_loc_id = location_existing.id if location_existing else None conflicts, blocking, already_attributed = _detect_conflicts( db, cluster, target_loc_id, proposed_at, proposed_until, ) confidence = _score_confidence(cluster, project_match, location_match, conflicts) return Suggestion( cluster = cluster, project_match = project_match, project_existing_id = project_existing.id if project_existing else None, project_existing_name = project_existing.name if project_existing else None, project_match_score = proj_score, project_suggested_name = project_suggested_name, location_match = location_match, location_existing_id = location_existing.id if location_existing else None, location_existing_name = location_existing.name if location_existing else None, location_match_score = loc_score, location_suggested_name = location_suggested_name, proposed_assigned_at = proposed_at, proposed_assigned_until = proposed_until, confidence = confidence, conflicts = conflicts, blocking_conflict = blocking, already_attributed = already_attributed, ) # ── Public API: scan ────────────────────────────────────────────────────────── async def scan_clusters_and_build_suggestions( db: Session, sfm_base_url: str, ) -> ScanResult: """End-to-end scan: fetch SFM events, cluster, build suggestions. Persists / updates a MetadataBackfillDecision row per non-already-attributed cluster with status='pending' (first time) or refreshed last_seen_at. """ clusters, scanned, already_attributed_events = await _scan_clusters(db, sfm_base_url) suggestions: list[Suggestion] = [] skipped_orphans = 0 already_attributed_clusters = 0 now = datetime.utcnow() for cluster in clusters: # Check existing decision. decision = db.query(MetadataBackfillDecision).filter_by( cluster_id=cluster.cluster_id ).first() if decision and decision.status == "skipped": skipped_orphans += 1 # Refresh last_seen so we know it's still around. decision.last_seen_at = now continue if decision and decision.status == "applied": # Already done. Phase 2 attribution should be picking up these # events now. decision.last_seen_at = now continue suggestion = _build_suggestion(db, cluster) if suggestion.already_attributed: already_attributed_clusters += 1 continue suggestions.append(suggestion) # Upsert decision row. if decision is None: db.add(MetadataBackfillDecision( cluster_id = cluster.cluster_id, status = "conflict" if suggestion.blocking_conflict else "pending", confidence = suggestion.confidence, first_seen_at = now, last_seen_at = now, serial = cluster.serial, project_raw = cluster.project_raw or None, location_raw = cluster.location_raw or None, first_event_ts = cluster.first_event_ts, last_event_ts = cluster.last_event_ts, event_count = cluster.event_count, )) else: decision.status = "conflict" if suggestion.blocking_conflict else "pending" decision.confidence = suggestion.confidence decision.last_seen_at = now decision.event_count = cluster.event_count decision.last_event_ts = cluster.last_event_ts db.commit() return ScanResult( suggestions = suggestions, skipped_orphans = skipped_orphans, already_attributed = already_attributed_clusters, scanned_event_count = scanned, cluster_count = len(clusters), ) # ── Public API: apply ───────────────────────────────────────────────────────── def _ensure_auto_imported_project_type(db: Session) -> str: """Create the 'auto_imported' ProjectType if it doesn't exist. Returns id.""" from backend.models import ProjectType pt = db.query(ProjectType).filter_by(id=AUTO_IMPORTED_TYPE_ID).first() if pt is None: pt = ProjectType( id = AUTO_IMPORTED_TYPE_ID, name = AUTO_IMPORTED_TYPE_NAME, description = "Projects created automatically by the metadata-backfill parser. Operators can re-type them later.", supports_vibration = True, supports_sound = False, ) db.add(pt) db.flush() return pt.id def _ensure_project(db: Session, suggestion: Suggestion) -> tuple[Project, bool]: """Return (project, created_flag). Dedup is normalisation-aware: "SR81" and "SR 81" collapse to the same project (both normalise to "sr 81"), as do "Fay - Locks & Dam No3" and "Fay-Locks-&-Dam-No3". Important when applying many clusters in one bulk operation — the first creates the project, subsequent clusters with normalisation-equivalent names attach to it instead of triggering a UNIQUE constraint violation. """ if suggestion.project_existing_id: p = db.query(Project).filter_by(id=suggestion.project_existing_id).first() if p is not None: return p, False candidate_name = suggestion.project_suggested_name.strip() or f"Auto-imported project ({suggestion.cluster.serial})" candidate_norm = _normalise(candidate_name) # Pre-flight normalised lookup: avoids creating duplicates that # differ only in punctuation/spacing. if candidate_norm: for p in db.query(Project).filter(Project.status != "deleted").all(): if _normalise(p.name) == candidate_norm: return p, False # Final fallback: case-insensitive exact (cheap, catches the same # things normalised lookup would but it's harmless to keep). existing = db.query(Project).filter(Project.name.ilike(candidate_name)).first() if existing is not None: return existing, False type_id = _ensure_auto_imported_project_type(db) # Derive dates. start = suggestion.cluster.first_event_ts.date() end = (suggestion.cluster.last_event_ts.date() if suggestion.proposed_assigned_until else None) p = Project( id = str(uuid.uuid4()), name = candidate_name, description = ( f"Auto-created by the metadata-backfill parser on " f"{datetime.utcnow():%Y-%m-%d}. Sourced from operator-typed " f"BW metadata across {suggestion.cluster.event_count} event(s) " f"from serial {suggestion.cluster.serial}." ), project_type_id = type_id, status = "active", data_collection_mode = "remote", client_name = suggestion.cluster.client_raw or None, start_date = start, end_date = end, ) db.add(p) db.flush() # Ensure the vibration_monitoring module is enabled. pm = ProjectModule( id = str(uuid.uuid4()), project_id = p.id, module_type = "vibration_monitoring", enabled = True, ) db.add(pm) db.flush() return p, True def _ensure_location( db: Session, project: Project, suggestion: Suggestion, ) -> tuple[MonitoringLocation, bool]: """Return (location, created_flag).""" if suggestion.location_existing_id: l = db.query(MonitoringLocation).filter_by(id=suggestion.location_existing_id).first() if l is not None and l.project_id == project.id: return l, False candidate_name = suggestion.location_suggested_name.strip() or "Unnamed location" candidate_norm = _normalise(candidate_name) # Normalisation-aware lookup within this project — same dedup # principle as _ensure_project. if candidate_norm: for existing in ( db.query(MonitoringLocation) .filter(MonitoringLocation.project_id == project.id) .all() ): if _normalise(existing.name) == candidate_norm: return existing, False # Fallback to case-insensitive exact. existing = ( db.query(MonitoringLocation) .filter(MonitoringLocation.project_id == project.id) .filter(MonitoringLocation.name.ilike(candidate_name)) .first() ) if existing is not None: return existing, False l = MonitoringLocation( id = str(uuid.uuid4()), project_id = project.id, location_type = "vibration", name = candidate_name, description = ( f"Auto-created by metadata-backfill from operator-typed sensor_location " f"\"{suggestion.cluster.location_raw}\" on events from serial " f"{suggestion.cluster.serial}." ), ) db.add(l) db.flush() return l, True def _apply_one( db: Session, suggestion: Suggestion, *, decided_by: str, ) -> tuple[str, str, str]: """Apply a single suggestion in a transaction. Returns (project_id, location_id, assignment_id). """ project, _proj_created = _ensure_project(db, suggestion) location, _loc_created = _ensure_location(db, project, suggestion) # Create the UnitAssignment. assignment_id = str(uuid.uuid4()) assignment = UnitAssignment( id = assignment_id, unit_id = suggestion.cluster.serial, location_id = location.id, project_id = project.id, device_type = "seismograph", assigned_at = suggestion.proposed_assigned_at, assigned_until = suggestion.proposed_assigned_until, status = "active" if suggestion.proposed_assigned_until is None else "completed", source = "metadata_backfill", notes = ( f"Auto-created from operator-typed metadata on " f"{suggestion.cluster.event_count} event(s). " f"Project: \"{suggestion.cluster.project_raw}\". " f"Location: \"{suggestion.cluster.location_raw}\". " f"Confidence: {suggestion.confidence}." ), ) db.add(assignment) # Audit log entry. db.add(UnitHistory( unit_id = suggestion.cluster.serial, change_type = "assignment_backfilled", field_name = "unit_assignment", old_value = None, new_value = f"{project.name} / {location.name}", changed_at = datetime.utcnow(), source = "metadata_backfill", notes = ( f"Created from {suggestion.cluster.event_count} events tagged " f"({suggestion.cluster.project_raw!r}, {suggestion.cluster.location_raw!r}). " f"By: {decided_by}." ), )) # Update the decision record. decision = db.query(MetadataBackfillDecision).filter_by( cluster_id=suggestion.cluster.cluster_id ).first() if decision: decision.status = "applied" decision.decided_at = datetime.utcnow() decision.decided_by = decided_by decision.applied_assignment_id = assignment_id return project.id, location.id, assignment_id def apply_suggestions( db: Session, suggestions: list[Suggestion], *, decided_by: str, ) -> ApplyResult: """Apply a list of suggestions. Each suggestion is applied in its own sub-transaction (via db.flush) — failures don't roll back successful ones. """ applied = 0 failed: list[tuple[str, str]] = [] proj_ids: list[str] = [] loc_ids: list[str] = [] asgn_ids: list[str] = [] for s in suggestions: if s.blocking_conflict: failed.append((s.cluster.cluster_id, "blocking conflict — needs manual resolution")) continue try: p_id, l_id, a_id = _apply_one(db, s, decided_by=decided_by) db.flush() # ensure FK consistency within transaction proj_ids.append(p_id) loc_ids.append(l_id) asgn_ids.append(a_id) applied += 1 except Exception as e: log.exception("Failed to apply cluster %s", s.cluster.cluster_id) db.rollback() failed.append((s.cluster.cluster_id, str(e))) db.commit() return ApplyResult( applied = applied, skipped = 0, failed = failed, project_ids_created = list(dict.fromkeys(proj_ids)), location_ids_created = list(dict.fromkeys(loc_ids)), assignment_ids_created = asgn_ids, ) def skip_clusters( db: Session, cluster_ids: list[str], *, decided_by: str = "operator", ) -> int: """Mark clusters as skipped so they don't reappear in future scans.""" now = datetime.utcnow() n = 0 for cluster_id in cluster_ids: decision = db.query(MetadataBackfillDecision).filter_by(cluster_id=cluster_id).first() if decision is None: continue if decision.status not in ("pending", "conflict"): continue decision.status = "skipped" decision.decided_at = now decision.decided_by = decided_by n += 1 db.commit() return n