Files
terra-view/backend/services/metadata_backfill.py
serversdown d46f9fccf8 fix(sfm): broaden Loc-N suffix regex to catch '.Loc' and 'Loc No.' variants
Operators use more separator variations than the original regex caught:
  - "Trumbull-Brayman-JV- Mont.Dam.Loc 2-R-25" — period as separator
  - "CMU - RKM Hall - Loc No. 3 - 4615 Forbes" — "No." between Loc and digit

Added period to the separator character class and optional "No." token
before the digit.  Catches both above patterns plus near-variants
without false-positives on normal project strings.

Real-data impact: 5 more clusters now auto-strip cleanly, including the
1,903-event Trumbull-Brayman-JV- Mont.Dam cluster.  Confidence
distribution: 43 → 44 high.
2026-05-12 19:19:46 +00:00

1140 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
metadata_backfill.py — turn operator-typed BW event metadata into the
terra-view Project / MonitoringLocation / UnitAssignment graph.
Architecture (see /home/serversdown/.claude/plans/sfm-metadata-backfill-parser.md):
1. Pre-filter: drop events that already fall inside an existing
UnitAssignment window (Phase 2 attribution already handles them).
2. Time-cluster: serial + 7-day gap is the cluster identity.
3. Metadata-split: split on persistent (>= 2 events) metadata transitions.
4. Match against existing graph (rapidfuzz multi-signal scoring).
5. Score confidence (high/medium/low).
6. Detect conflicts (overlap with existing UnitAssignment at different
location for the same serial → blocking).
7. Apply: create Project / MonitoringLocation / UnitAssignment +
UnitHistory audit row, all in one transaction.
Public API:
scan_clusters_and_build_suggestions(db, sfm_base_url) → ScanResult
apply_suggestions(db, cluster_ids, *, decided_by) → ApplyResult
skip_clusters(db, cluster_ids, *, decided_by) → int
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
import re
import uuid
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime, date, timedelta
from typing import Optional, Iterable, Literal
import httpx
import rapidfuzz
from sqlalchemy.orm import Session
from backend.models import (
Project,
ProjectModule,
MonitoringLocation,
UnitAssignment,
RosterUnit,
UnitHistory,
MetadataBackfillDecision,
)
log = logging.getLogger("backend.services.metadata_backfill")
# ── Tunables ──────────────────────────────────────────────────────────────────
CLUSTER_GAP_DAYS = 7 # time gap that splits a time-cluster
MIN_SPLIT_RUN_LENGTH = 2 # min consecutive events to trigger meta-split
FUZZY_EXACT_THRESHOLD = 0.95 # WRatio score → treated as exact match
FUZZY_MATCH_THRESHOLD = 0.80 # WRatio score → treated as fuzzy match
FUZZY_AMBIGUITY_DELTA = 0.05 # if 2nd-best score is within this of 1st → ambiguous
SUSPICIOUS_SPAN_DAYS = 90 # cluster spanning > this with sparse events → suspicious
RECENT_CLUSTER_DAYS = 3 # if cluster end is within this many days of now → leave assigned_until=NULL
SFM_FETCH_CEILING = 5000 # max events per SFM /db/events call
SFM_TIMEOUT = 30.0 # generous; this runs in the background
# ProjectType to assign to auto-created projects. We use a sentinel
# "auto_imported" type that the parser ensures exists. Operator can re-type
# them later by editing the Project.
AUTO_IMPORTED_TYPE_ID = "auto_imported"
AUTO_IMPORTED_TYPE_NAME = "Auto-imported (from event metadata)"
# ── Normalisation + fuzzy matching ─────────────────────────────────────────────
def _normalise(s: Optional[str]) -> str:
"""Lowercase, replace internal punctuation with spaces, collapse spaces.
Aggressive enough that 'Test-5-8-26' and 'Test 5/8/26' produce the
same normalised string ('test 5 8 26'), but preserves alphanumeric
content so 'I-80 N Fork' doesn't lose its content (becomes 'i 80 n fork').
"""
if not s:
return ""
s = s.strip().lower()
# Replace any non-alphanumeric character with a space (preserves the
# tokens, just normalises the separators).
s = re.sub(r"[^a-z0-9]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
# Match a "Loc N" / "Location #N" suffix preceded by a separator. Operators
# often type project names like "Fay - Locks & Dam No3 - Loc 2 - 735 Bunola"
# where the leading "Fay - Locks & Dam No3" is the actual project and the
# trailing "- Loc 2 - ..." is location info that already lives in the
# sensor_location field. We strip the trailing junk so projects with the
# same root get clustered together.
#
# Matches:
# "- Loc 2", "-Loc3", "- Location #5", " — Location.5", "- LOC #07"
# Doesn't match strings without an obvious Loc N marker — those keep
# their full project_raw and the operator can edit them in the wizard.
_PROJECT_LOC_SUFFIX = re.compile(
r"""
\s* # any leading whitespace
[-–—.] # separator: hyphen, em-dash, or period
# (operators use any of these — see
# "Mont.Dam.Loc 2-R-25")
\s*
(?:loc|location) # 'Loc' or 'Location'
\.? # optional trailing period after Loc
\s*
(?:no\.?\s*)? # optional "No." or "No " before the digit
# (e.g. "Loc No. 3", "Loc No 5")
\#? # optional '#'
\s*
\d+ # required digit
\b
""",
re.IGNORECASE | re.VERBOSE,
)
def _extract_project_root(project_raw: str) -> str:
"""Return the leading 'project root' portion of an operator-typed string.
Strips everything from the first " - Loc N" (or similar) marker forward,
so 'Fay - Locks & Dam No3 - Loc 2 - 735 Bunola' becomes
'Fay - Locks & Dam No3'. Strings without a Loc-marker pass through
unchanged.
Trailing whitespace and dangling hyphens are cleaned up.
"""
if not project_raw:
return ""
m = _PROJECT_LOC_SUFFIX.search(project_raw)
if m is None:
return project_raw.strip()
root = project_raw[: m.start()]
# Strip trailing whitespace + dangling separators left behind
# (e.g. "Fay - Locks & Dam No3 -" → "Fay - Locks & Dam No3").
root = re.sub(r"[\s\-–—]+$", "", root)
return root.strip()
# Min length of the SHORTER input before a fuzzy match is accepted.
# rapidfuzz.WRatio is generous with partial_ratio on short strings — e.g.
# 'demo' vs 'bridge demo project' scores 0.90 (false positive). Requiring
# the shorter input be >= 5 chars filters out those degenerate cases.
_MIN_FUZZY_LEN = 5
def similarity(a: str, b: str) -> float:
"""Multi-signal similarity score in [0.0, 1.0] via rapidfuzz.WRatio.
Blends Levenshtein, partial-substring, token-sort, token-set
scoring. Handles abbreviations ('N' vs 'North'), word reordering,
and substring containment.
Returns 0.0 if either input is empty OR if the shorter input is
too short to fuzzy-match safely (see _MIN_FUZZY_LEN comment) AND the
strings don't exact-match. This guardrails the 'one common word
inside a longer phrase' false positive.
"""
if not a or not b:
return 0.0
if a == b:
return 1.0
if min(len(a), len(b)) < _MIN_FUZZY_LEN:
return 0.0
return rapidfuzz.fuzz.WRatio(a, b) / 100.0
# ── Cluster + Suggestion dataclasses ───────────────────────────────────────────
@dataclass
class Cluster:
cluster_id: str
serial: str
first_event_ts: datetime
last_event_ts: datetime
event_count: int
sample_event_id: str
# project_raw is the FULL operator-typed string (e.g.
# "Fay - Locks & Dam No3 - Loc 5 Synthomer"). Kept for display so
# operator can sanity-check what they typed.
project_raw: str
# project_root is project_raw with any trailing "- Loc N" suffix
# stripped — what we actually use for matching and as the suggested
# project name. (e.g. "Fay - Locks & Dam No3"). Same as project_raw
# if no Loc marker was found.
project_root: str
project_norm: str
location_raw: str
location_norm: str
client_raw: str
operator_raw: str
is_blank_meta: bool
metadata_consistency: float # 0.01.0
@dataclass
class ConflictHint:
existing_assignment_id: str
other_location_id: str
other_location_name: str
other_project_id: str
other_project_name: str
@dataclass
class Suggestion:
cluster: Cluster
project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"]
project_existing_id: Optional[str]
project_existing_name: Optional[str]
project_match_score: Optional[float]
project_suggested_name: str
location_match: Literal["exact", "fuzzy", "create_new"]
location_existing_id: Optional[str]
location_existing_name: Optional[str]
location_match_score: Optional[float]
location_suggested_name: str
proposed_assigned_at: datetime
proposed_assigned_until: Optional[datetime] # None = active / open-ended
confidence: Literal["high", "medium", "low"]
conflicts: list[ConflictHint] = field(default_factory=list)
blocking_conflict: bool = False
already_attributed: bool = False
@dataclass
class ScanResult:
suggestions: list[Suggestion]
skipped_orphans: int # clusters skipped because pre-existing skip decision
already_attributed: int # clusters with full pre-existing assignment overlap
scanned_event_count: int
cluster_count: int
@dataclass
class ApplyResult:
applied: int
skipped: int
failed: list[tuple[str, str]] # (cluster_id, reason)
project_ids_created: list[str]
location_ids_created: list[str]
assignment_ids_created: list[str]
# ── Step 1+2+3: clustering ────────────────────────────────────────────────────
def _build_cluster_id(serial: str, first_ts: datetime, last_ts: datetime) -> str:
"""Deterministic SHA1 hash of (serial, first_date, last_date).
Stable across re-scans of the same data — typo-corrected events
don't change the cluster_id.
"""
key = f"{serial}|{first_ts.date().isoformat()}|{last_ts.date().isoformat()}"
return hashlib.sha1(key.encode("utf-8")).hexdigest()
def _events_overlap_existing_assignment(
ev_ts: datetime,
serial: str,
assignments_by_serial: dict[str, list[UnitAssignment]],
now: datetime,
) -> bool:
"""Does this event timestamp fall inside any existing UnitAssignment
window for this serial?"""
for a in assignments_by_serial.get(serial, []):
a_end = a.assigned_until or now
if a.assigned_at <= ev_ts <= a_end:
return True
return False
def _mode_string(values: Iterable[Optional[str]]) -> tuple[str, float]:
"""Return (most_common_value, consistency_fraction).
Treats None/empty as a single "blank" bucket. consistency_fraction
is fraction of inputs equal to the modal value.
"""
vals = [v if v else "" for v in values]
if not vals:
return "", 1.0
counts = Counter(vals)
mode_value, mode_count = counts.most_common(1)[0]
return mode_value, mode_count / len(vals)
def _split_by_metadata_runs(events: list[dict]) -> list[list[dict]]:
"""Run-length-encode (project_norm, location_norm) sequence; drop runs
shorter than MIN_SPLIT_RUN_LENGTH; return list of sub-cluster event lists.
Single-event metadata blips (typos) are merged back into the surrounding
cluster's modal metadata.
"""
if not events:
return []
# Build (key, idx) pairs. Blanks count as the "previous" key for splitting
# purposes — i.e., a blank event doesn't fork the cluster.
def _key(ev: dict) -> tuple[str, str]:
return (_normalise(ev.get("project")), _normalise(ev.get("sensor_location")))
# Identify runs.
runs: list[list[int]] = [] # each run is a list of event-indices
last_key: Optional[tuple[str, str]] = None
current_run: list[int] = []
blank = ("", "")
for i, ev in enumerate(events):
k = _key(ev)
if k == blank:
# Blank events inherit the previous run (or start a blank run if
# they're at the beginning).
current_run.append(i)
continue
if last_key is None or k == last_key:
current_run.append(i)
last_key = k
else:
runs.append(current_run)
current_run = [i]
last_key = k
if current_run:
runs.append(current_run)
# Filter out short runs (typos / one-off blips). Their events are folded
# back into the run that follows or precedes them, whichever is longer.
def _run_key(run: list[int]) -> tuple[str, str]:
for idx in run:
k = _key(events[idx])
if k != blank:
return k
return blank
filtered: list[list[int]] = []
pending_short: list[int] = []
for run in runs:
if len(run) >= MIN_SPLIT_RUN_LENGTH:
# Fold any pending-short events into this run's front.
run = pending_short + run
pending_short = []
filtered.append(run)
else:
pending_short.extend(run)
# Any trailing pending-short get appended to the previous filtered run,
# or become their own run if nothing came before.
if pending_short:
if filtered:
filtered[-1].extend(pending_short)
else:
filtered.append(pending_short)
return [[events[i] for i in run] for run in filtered]
def _events_to_clusters(
serial: str,
events: list[dict], # already filtered: assignments_by_serial overlap dropped
) -> list[Cluster]:
"""Sort by timestamp, time-cluster on CLUSTER_GAP_DAYS, then metadata-split.
Each output cluster has its modal metadata computed across its events.
"""
if not events:
return []
# Sort by timestamp ascending. (Caller has already dropped events with
# blank timestamps; guard anyway.)
events = [e for e in events if e.get("timestamp")]
events = sorted(events, key=lambda e: e["timestamp"])
if not events:
return []
# Time-cluster.
gap = timedelta(days=CLUSTER_GAP_DAYS)
time_clusters: list[list[dict]] = []
current: list[dict] = []
last_ts: Optional[datetime] = None
for ev in events:
ts = _parse_ts(ev["timestamp"])
if last_ts is None or (ts - last_ts) <= gap:
current.append(ev)
else:
time_clusters.append(current)
current = [ev]
last_ts = ts
if current:
time_clusters.append(current)
# Metadata-split each time-cluster.
out: list[Cluster] = []
for tc in time_clusters:
sub_clusters = _split_by_metadata_runs(tc)
for sc in sub_clusters:
cluster = _build_cluster(serial, sc)
out.append(cluster)
return out
def _parse_ts(ts: str) -> datetime:
"""Parse SFM-returned timestamp string into datetime."""
# SFM returns ISO 8601 with 'T' separator, sometimes with offset.
s = ts.replace("Z", "+00:00")
try:
dt = datetime.fromisoformat(s)
except ValueError:
# Fallback: space separator
dt = datetime.fromisoformat(s.replace(" ", "T"))
# Strip tzinfo for consistency with terra-view's naive UTC datetimes.
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
return dt
def _build_cluster(serial: str, events: list[dict]) -> Cluster:
"""Compute a Cluster from a list of events that already belong together."""
timestamps = [_parse_ts(ev["timestamp"]) for ev in events]
first_ts = min(timestamps)
last_ts = max(timestamps)
project_mode_norm, project_consistency = _mode_string(_normalise(ev.get("project")) for ev in events)
location_mode_norm, location_consistency = _mode_string(_normalise(ev.get("sensor_location")) for ev in events)
# For display, pick the most common RAW value (case-preserved) that
# normalises to the modal normalised value.
def _pick_display(field: str, mode_norm: str) -> str:
for ev in events:
v = (ev.get(field) or "").strip()
if _normalise(v) == mode_norm and v:
return v
return ""
project_raw = _pick_display("project", project_mode_norm)
location_raw = _pick_display("sensor_location", location_mode_norm)
client_raw = _pick_display("client", _normalise(events[0].get("client")))
operator_raw = _pick_display("operator", _normalise(events[0].get("operator")))
# Strip trailing "- Loc N" location info that operators sometimes bake
# into the project string for email-readability ("I-80 - Loc 2 - 543 W
# Plant Rd" → "I-80"). The sensor_location field already has the
# authoritative location identifier. Use project_root for matching
# and as the suggested project name; keep project_raw for display.
project_root = _extract_project_root(project_raw)
project_norm_for_matching = _normalise(project_root)
consistency = min(project_consistency, location_consistency)
is_blank = (not project_norm_for_matching) or (not location_mode_norm)
return Cluster(
cluster_id = _build_cluster_id(serial, first_ts, last_ts),
serial = serial,
first_event_ts = first_ts,
last_event_ts = last_ts,
event_count = len(events),
sample_event_id = events[0]["id"],
project_raw = project_raw,
project_root = project_root,
project_norm = project_norm_for_matching,
location_raw = location_raw,
location_norm = location_mode_norm,
client_raw = client_raw,
operator_raw = operator_raw,
is_blank_meta = is_blank,
metadata_consistency = consistency,
)
# ── Step 4: SFM fetch ─────────────────────────────────────────────────────────
async def _fetch_all_events_from_sfm(sfm_base_url: str) -> list[dict]:
"""Pull every event from SFM. Currently the only filter we can use is
serial-based, so we have to iterate over known serials. Practically this
means: get the list of units from /db/units, then fetch /db/events per
serial. ~26 calls for the dev box, manageable for prod (~50-100 serials).
"""
async with httpx.AsyncClient(timeout=SFM_TIMEOUT) as client:
units_resp = await client.get(f"{sfm_base_url}/db/units")
units_resp.raise_for_status()
units = units_resp.json()
serials = [u["serial"] for u in units if u.get("serial")]
async def _fetch_one(serial: str) -> list[dict]:
try:
r = await client.get(
f"{sfm_base_url}/db/events",
params={"serial": serial, "limit": SFM_FETCH_CEILING},
)
r.raise_for_status()
payload = r.json()
# Strip waveform_blob (large bytes; we don't need them)
evs = payload.get("events", []) or []
for ev in evs:
ev.pop("waveform_blob", None)
ev.pop("a5_pickle_filename", None)
return evs
except httpx.HTTPError as e:
log.warning("SFM fetch failed for serial=%s: %s", serial, e)
return []
all_events_nested = await asyncio.gather(*[_fetch_one(s) for s in serials])
all_events: list[dict] = []
for batch in all_events_nested:
all_events.extend(batch)
return all_events
# ── Step 4 cont.: cluster building from SFM ───────────────────────────────────
async def _scan_clusters(
db: Session,
sfm_base_url: str,
) -> tuple[list[Cluster], int, int]:
"""Pull SFM events, pre-filter, time-cluster, metadata-split.
Returns (clusters, scanned_event_count, already_attributed_event_count).
"""
events = await _fetch_all_events_from_sfm(sfm_base_url)
scanned = len(events)
# Load all existing UnitAssignments once, group by serial.
all_assignments = db.query(UnitAssignment).all()
assignments_by_serial: dict[str, list[UnitAssignment]] = {}
for a in all_assignments:
assignments_by_serial.setdefault(a.unit_id, []).append(a)
now = datetime.utcnow()
# Bucket events by serial.
by_serial: dict[str, list[dict]] = {}
already_attributed = 0
for ev in events:
serial = ev.get("serial")
if not serial:
continue
ts_raw = ev.get("timestamp")
if not ts_raw:
# Skip events without a timestamp — they can't be clustered.
continue
ts = _parse_ts(ts_raw)
if _events_overlap_existing_assignment(ts, serial, assignments_by_serial, now):
already_attributed += 1
continue
by_serial.setdefault(serial, []).append(ev)
# Cluster per serial.
clusters: list[Cluster] = []
for serial, serial_events in by_serial.items():
clusters.extend(_events_to_clusters(serial, serial_events))
return clusters, scanned, already_attributed
# ── Step 5: matching against the existing graph ───────────────────────────────
def _find_best_match(
candidate_norm: str,
candidates: list[tuple[str, str]], # (id, normalised_name)
) -> tuple[Optional[str], Optional[float], str]:
"""Return (best_id, best_score, classification).
classification ∈ {"exact", "fuzzy", "ambiguous", "no_match"}
"""
if not candidate_norm or not candidates:
return None, None, "no_match"
scored = [(cid, similarity(candidate_norm, cnorm)) for cid, cnorm in candidates]
scored.sort(key=lambda x: x[1], reverse=True)
best_id, best_score = scored[0]
# Check for ambiguity: 2nd-best within FUZZY_AMBIGUITY_DELTA of best,
# and both above the match threshold.
if len(scored) > 1:
_, second_score = scored[1]
if best_score >= FUZZY_MATCH_THRESHOLD and second_score >= FUZZY_MATCH_THRESHOLD \
and (best_score - second_score) < FUZZY_AMBIGUITY_DELTA:
return best_id, best_score, "ambiguous"
if best_score >= FUZZY_EXACT_THRESHOLD:
return best_id, best_score, "exact"
if best_score >= FUZZY_MATCH_THRESHOLD:
return best_id, best_score, "fuzzy"
return None, best_score, "no_match"
def _detect_conflicts(
db: Session,
cluster: Cluster,
target_location_id: Optional[str],
proposed_assigned_at: datetime,
proposed_assigned_until: Optional[datetime],
) -> tuple[list[ConflictHint], bool, bool]:
"""Return (conflicts, blocking, already_attributed_at_target).
already_attributed_at_target = True means an existing UnitAssignment for
THIS serial is already at THIS target location during this window —
nothing to do, skip the cluster.
"""
now = datetime.utcnow()
end = proposed_assigned_until or now
overlapping = (
db.query(UnitAssignment)
.filter(UnitAssignment.unit_id == cluster.serial)
.all()
)
conflicts: list[ConflictHint] = []
already_at_target = False
blocking = False
for a in overlapping:
a_end = a.assigned_until or now
if proposed_assigned_at < a_end and end > a.assigned_at:
# Overlapping window.
if target_location_id is not None and a.location_id == target_location_id:
already_at_target = True
continue
# Different location → blocking conflict.
loc = db.query(MonitoringLocation).filter_by(id=a.location_id).first()
proj = db.query(Project).filter_by(id=a.project_id).first()
conflicts.append(ConflictHint(
existing_assignment_id = a.id,
other_location_id = a.location_id,
other_location_name = loc.name if loc else "?",
other_project_id = a.project_id,
other_project_name = proj.name if proj else "?",
))
blocking = True
return conflicts, blocking, already_at_target
def _score_confidence(
cluster: Cluster,
project_match: str,
location_match: str,
conflicts: list[ConflictHint],
) -> str:
"""Return 'high' | 'medium' | 'low'. See plan for full rules."""
if conflicts:
return "low"
if cluster.is_blank_meta:
return "low"
if cluster.event_count < 2:
return "low"
span_days = (cluster.last_event_ts - cluster.first_event_ts).days
if span_days > SUSPICIOUS_SPAN_DAYS and cluster.event_count < (span_days / 7):
# Cluster spans > 90 days but emits less than 1 event/week — sparse.
return "low"
if project_match == "ambiguous" or location_match == "ambiguous":
return "low"
if project_match == "fuzzy" or location_match == "fuzzy":
return "medium"
# Exact match on both, OR clean create_new on both with good event count
# and non-blank metadata.
if (project_match == "exact" and location_match == "exact"):
return "high"
if (project_match == "create_new" and location_match == "create_new"):
return "high"
# Mixed: one exact, one create_new → medium (probably means location
# is new under an existing project, which is common but worth review)
return "medium"
def _build_suggestion(db: Session, cluster: Cluster) -> Suggestion:
"""Match cluster against the existing graph, score, detect conflicts."""
# Match project.
existing_projects = db.query(Project).filter(Project.status != "deleted").all()
project_candidates = [(p.id, _normalise(p.name)) for p in existing_projects]
if cluster.project_norm:
proj_id, proj_score, proj_match = _find_best_match(cluster.project_norm, project_candidates)
else:
proj_id, proj_score, proj_match = None, None, "create_new"
project_existing = None
project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"]
if proj_match == "exact":
project_match = "exact"
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
elif proj_match == "fuzzy":
project_match = "fuzzy"
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
elif proj_match == "ambiguous":
project_match = "ambiguous"
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
else:
project_match = "create_new"
project_suggested_name = (
project_existing.name if project_existing and project_match == "exact"
else cluster.project_root or cluster.project_raw or f"Project {cluster.serial}"
)
# Match location ONLY within the matched project's existing locations.
# If we're creating a new project, every location is also new (location
# names should not be fuzzy-matched across project boundaries — "Loc 1"
# at Project A is a different thing than "Loc 1" at Project B).
if project_existing and project_match in ("exact", "fuzzy"):
location_candidates_objs = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project_existing.id)
.filter(MonitoringLocation.location_type == "vibration")
.all()
)
location_candidates = [(l.id, _normalise(l.name)) for l in location_candidates_objs]
if cluster.location_norm:
loc_id, loc_score, loc_match = _find_best_match(cluster.location_norm, location_candidates)
else:
loc_id, loc_score, loc_match = None, None, "create_new"
else:
# Project will be created new → location is automatically new.
location_candidates_objs = []
loc_id, loc_score, loc_match = None, None, "create_new"
location_existing = None
location_match: Literal["exact", "fuzzy", "create_new"]
if loc_match == "exact":
location_match = "exact"
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
elif loc_match == "fuzzy":
location_match = "fuzzy"
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
elif loc_match == "ambiguous":
# We treat ambiguous-location as fuzzy for now; the operator can
# pick which one (out of scope: a richer disambiguation UI).
location_match = "fuzzy"
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
else:
location_match = "create_new"
location_suggested_name = (
location_existing.name if location_existing and location_match == "exact"
else cluster.location_raw or "Unnamed location"
)
# Proposed assignment window.
proposed_at = cluster.first_event_ts - timedelta(hours=1)
now = datetime.utcnow()
if (now - cluster.last_event_ts) <= timedelta(days=RECENT_CLUSTER_DAYS):
proposed_until = None # Treat as active
else:
proposed_until = cluster.last_event_ts + timedelta(hours=1)
# Conflict detection vs existing UnitAssignments.
target_loc_id = location_existing.id if location_existing else None
conflicts, blocking, already_attributed = _detect_conflicts(
db, cluster, target_loc_id, proposed_at, proposed_until,
)
confidence = _score_confidence(cluster, project_match, location_match, conflicts)
return Suggestion(
cluster = cluster,
project_match = project_match,
project_existing_id = project_existing.id if project_existing else None,
project_existing_name = project_existing.name if project_existing else None,
project_match_score = proj_score,
project_suggested_name = project_suggested_name,
location_match = location_match,
location_existing_id = location_existing.id if location_existing else None,
location_existing_name = location_existing.name if location_existing else None,
location_match_score = loc_score,
location_suggested_name = location_suggested_name,
proposed_assigned_at = proposed_at,
proposed_assigned_until = proposed_until,
confidence = confidence,
conflicts = conflicts,
blocking_conflict = blocking,
already_attributed = already_attributed,
)
# ── Public API: scan ──────────────────────────────────────────────────────────
async def scan_clusters_and_build_suggestions(
db: Session,
sfm_base_url: str,
) -> ScanResult:
"""End-to-end scan: fetch SFM events, cluster, build suggestions.
Persists / updates a MetadataBackfillDecision row per non-already-attributed
cluster with status='pending' (first time) or refreshed last_seen_at.
"""
clusters, scanned, already_attributed_events = await _scan_clusters(db, sfm_base_url)
suggestions: list[Suggestion] = []
skipped_orphans = 0
already_attributed_clusters = 0
now = datetime.utcnow()
for cluster in clusters:
# Check existing decision.
decision = db.query(MetadataBackfillDecision).filter_by(
cluster_id=cluster.cluster_id
).first()
if decision and decision.status == "skipped":
skipped_orphans += 1
# Refresh last_seen so we know it's still around.
decision.last_seen_at = now
continue
if decision and decision.status == "applied":
# Already done. Phase 2 attribution should be picking up these
# events now.
decision.last_seen_at = now
continue
suggestion = _build_suggestion(db, cluster)
if suggestion.already_attributed:
already_attributed_clusters += 1
continue
suggestions.append(suggestion)
# Upsert decision row.
if decision is None:
db.add(MetadataBackfillDecision(
cluster_id = cluster.cluster_id,
status = "conflict" if suggestion.blocking_conflict else "pending",
confidence = suggestion.confidence,
first_seen_at = now,
last_seen_at = now,
serial = cluster.serial,
project_raw = cluster.project_raw or None,
location_raw = cluster.location_raw or None,
first_event_ts = cluster.first_event_ts,
last_event_ts = cluster.last_event_ts,
event_count = cluster.event_count,
))
else:
decision.status = "conflict" if suggestion.blocking_conflict else "pending"
decision.confidence = suggestion.confidence
decision.last_seen_at = now
decision.event_count = cluster.event_count
decision.last_event_ts = cluster.last_event_ts
db.commit()
return ScanResult(
suggestions = suggestions,
skipped_orphans = skipped_orphans,
already_attributed = already_attributed_clusters,
scanned_event_count = scanned,
cluster_count = len(clusters),
)
# ── Public API: apply ─────────────────────────────────────────────────────────
def _ensure_auto_imported_project_type(db: Session) -> str:
"""Create the 'auto_imported' ProjectType if it doesn't exist. Returns id."""
from backend.models import ProjectType
pt = db.query(ProjectType).filter_by(id=AUTO_IMPORTED_TYPE_ID).first()
if pt is None:
pt = ProjectType(
id = AUTO_IMPORTED_TYPE_ID,
name = AUTO_IMPORTED_TYPE_NAME,
description = "Projects created automatically by the metadata-backfill parser. Operators can re-type them later.",
supports_vibration = True,
supports_sound = False,
)
db.add(pt)
db.flush()
return pt.id
def _ensure_project(db: Session, suggestion: Suggestion) -> tuple[Project, bool]:
"""Return (project, created_flag).
Dedup is normalisation-aware: "SR81" and "SR 81" collapse to the same
project (both normalise to "sr 81"), as do "Fay - Locks & Dam No3"
and "Fay-Locks-&-Dam-No3". Important when applying many clusters in
one bulk operation — the first creates the project, subsequent
clusters with normalisation-equivalent names attach to it instead
of triggering a UNIQUE constraint violation.
"""
if suggestion.project_existing_id:
p = db.query(Project).filter_by(id=suggestion.project_existing_id).first()
if p is not None:
return p, False
candidate_name = suggestion.project_suggested_name.strip() or f"Auto-imported project ({suggestion.cluster.serial})"
candidate_norm = _normalise(candidate_name)
# Pre-flight normalised lookup: avoids creating duplicates that
# differ only in punctuation/spacing.
if candidate_norm:
for p in db.query(Project).filter(Project.status != "deleted").all():
if _normalise(p.name) == candidate_norm:
return p, False
# Final fallback: case-insensitive exact (cheap, catches the same
# things normalised lookup would but it's harmless to keep).
existing = db.query(Project).filter(Project.name.ilike(candidate_name)).first()
if existing is not None:
return existing, False
type_id = _ensure_auto_imported_project_type(db)
# Derive dates.
start = suggestion.cluster.first_event_ts.date()
end = (suggestion.cluster.last_event_ts.date()
if suggestion.proposed_assigned_until else None)
p = Project(
id = str(uuid.uuid4()),
name = candidate_name,
description = (
f"Auto-created by the metadata-backfill parser on "
f"{datetime.utcnow():%Y-%m-%d}. Sourced from operator-typed "
f"BW metadata across {suggestion.cluster.event_count} event(s) "
f"from serial {suggestion.cluster.serial}."
),
project_type_id = type_id,
status = "active",
data_collection_mode = "remote",
client_name = suggestion.cluster.client_raw or None,
start_date = start,
end_date = end,
)
db.add(p)
db.flush()
# Ensure the vibration_monitoring module is enabled.
pm = ProjectModule(
id = str(uuid.uuid4()),
project_id = p.id,
module_type = "vibration_monitoring",
enabled = True,
)
db.add(pm)
db.flush()
return p, True
def _ensure_location(
db: Session,
project: Project,
suggestion: Suggestion,
) -> tuple[MonitoringLocation, bool]:
"""Return (location, created_flag)."""
if suggestion.location_existing_id:
l = db.query(MonitoringLocation).filter_by(id=suggestion.location_existing_id).first()
if l is not None and l.project_id == project.id:
return l, False
candidate_name = suggestion.location_suggested_name.strip() or "Unnamed location"
candidate_norm = _normalise(candidate_name)
# Normalisation-aware lookup within this project — same dedup
# principle as _ensure_project.
if candidate_norm:
for existing in (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project.id)
.all()
):
if _normalise(existing.name) == candidate_norm:
return existing, False
# Fallback to case-insensitive exact.
existing = (
db.query(MonitoringLocation)
.filter(MonitoringLocation.project_id == project.id)
.filter(MonitoringLocation.name.ilike(candidate_name))
.first()
)
if existing is not None:
return existing, False
l = MonitoringLocation(
id = str(uuid.uuid4()),
project_id = project.id,
location_type = "vibration",
name = candidate_name,
description = (
f"Auto-created by metadata-backfill from operator-typed sensor_location "
f"\"{suggestion.cluster.location_raw}\" on events from serial "
f"{suggestion.cluster.serial}."
),
)
db.add(l)
db.flush()
return l, True
def _apply_one(
db: Session,
suggestion: Suggestion,
*,
decided_by: str,
) -> tuple[str, str, str]:
"""Apply a single suggestion in a transaction.
Returns (project_id, location_id, assignment_id).
"""
project, _proj_created = _ensure_project(db, suggestion)
location, _loc_created = _ensure_location(db, project, suggestion)
# Create the UnitAssignment.
assignment_id = str(uuid.uuid4())
assignment = UnitAssignment(
id = assignment_id,
unit_id = suggestion.cluster.serial,
location_id = location.id,
project_id = project.id,
device_type = "seismograph",
assigned_at = suggestion.proposed_assigned_at,
assigned_until = suggestion.proposed_assigned_until,
status = "active" if suggestion.proposed_assigned_until is None else "completed",
source = "metadata_backfill",
notes = (
f"Auto-created from operator-typed metadata on "
f"{suggestion.cluster.event_count} event(s). "
f"Project: \"{suggestion.cluster.project_raw}\". "
f"Location: \"{suggestion.cluster.location_raw}\". "
f"Confidence: {suggestion.confidence}."
),
)
db.add(assignment)
# Audit log entry.
db.add(UnitHistory(
unit_id = suggestion.cluster.serial,
change_type = "assignment_backfilled",
field_name = "unit_assignment",
old_value = None,
new_value = f"{project.name} / {location.name}",
changed_at = datetime.utcnow(),
source = "metadata_backfill",
notes = (
f"Created from {suggestion.cluster.event_count} events tagged "
f"({suggestion.cluster.project_raw!r}, {suggestion.cluster.location_raw!r}). "
f"By: {decided_by}."
),
))
# Update the decision record.
decision = db.query(MetadataBackfillDecision).filter_by(
cluster_id=suggestion.cluster.cluster_id
).first()
if decision:
decision.status = "applied"
decision.decided_at = datetime.utcnow()
decision.decided_by = decided_by
decision.applied_assignment_id = assignment_id
return project.id, location.id, assignment_id
def apply_suggestions(
db: Session,
suggestions: list[Suggestion],
*,
decided_by: str,
) -> ApplyResult:
"""Apply a list of suggestions. Each suggestion is applied in its own
sub-transaction (via db.flush) — failures don't roll back successful ones.
"""
applied = 0
failed: list[tuple[str, str]] = []
proj_ids: list[str] = []
loc_ids: list[str] = []
asgn_ids: list[str] = []
for s in suggestions:
if s.blocking_conflict:
failed.append((s.cluster.cluster_id, "blocking conflict — needs manual resolution"))
continue
try:
p_id, l_id, a_id = _apply_one(db, s, decided_by=decided_by)
db.flush() # ensure FK consistency within transaction
proj_ids.append(p_id)
loc_ids.append(l_id)
asgn_ids.append(a_id)
applied += 1
except Exception as e:
log.exception("Failed to apply cluster %s", s.cluster.cluster_id)
db.rollback()
failed.append((s.cluster.cluster_id, str(e)))
db.commit()
return ApplyResult(
applied = applied,
skipped = 0,
failed = failed,
project_ids_created = list(dict.fromkeys(proj_ids)),
location_ids_created = list(dict.fromkeys(loc_ids)),
assignment_ids_created = asgn_ids,
)
def skip_clusters(
db: Session,
cluster_ids: list[str],
*,
decided_by: str = "operator",
) -> int:
"""Mark clusters as skipped so they don't reappear in future scans."""
now = datetime.utcnow()
n = 0
for cluster_id in cluster_ids:
decision = db.query(MetadataBackfillDecision).filter_by(cluster_id=cluster_id).first()
if decision is None:
continue
if decision.status not in ("pending", "conflict"):
continue
decision.status = "skipped"
decision.decided_at = now
decision.decided_by = decided_by
n += 1
db.commit()
return n