6ebbe28308
Operators sometimes bake location identifiers into the project string for email-readability — "Fay - Locks & Dam No3 - Loc 2 - 735 Bunola" where "Fay - Locks & Dam No3" is the actual project and "- Loc 2 - 735 Bunola" is location info that already lives in sensor_location. Without stripping, every "- Loc N" variant became a separate project, fragmenting what should be one project with several locations. Backend: - New _extract_project_root() helper. Regex matches " - Loc N" / "-Loc3" / " - Location #5" / etc. with case-insensitive multi-dash support; strips from that marker forward and cleans up dangling separators. Strings without a Loc-marker pass through unchanged. - Cluster dataclass adds project_root field alongside project_raw. project_raw stays the operator-typed string for display ("hover to see what was actually typed"). project_root is what gets normalised for matching and used as the suggested project name. - _ensure_project + _ensure_location now do normalisation-aware dedup before creating: a cluster of "SR81" and a cluster of "SR 81" (which normalise to the same string) collapse into one project on apply, even when applied in the same bulk operation. Avoids UNIQUE constraint collisions and duplicate-named-by-spacing projects. Frontend: - Wizard cluster cards show "↳ stripped trailing 'Loc N' suffix; operator typed: <raw>" when project_root differs from project_raw, so the operator can see at a glance what the parser did to the string. Real-data results: against the same 10,055 SFM events, confidence distribution improved from 37/14/8 (high/med/low) to 43/9/7. "Fay - Locks & Dam No3" now appears as ONE project across 6 cluster instances spanning 3 serials and 6 different locations — exactly the "one project, many locations" model the user described. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1136 lines
42 KiB
Python
1136 lines
42 KiB
Python
"""
|
||
metadata_backfill.py — turn operator-typed BW event metadata into the
|
||
terra-view Project / MonitoringLocation / UnitAssignment graph.
|
||
|
||
Architecture (see /home/serversdown/.claude/plans/sfm-metadata-backfill-parser.md):
|
||
|
||
1. Pre-filter: drop events that already fall inside an existing
|
||
UnitAssignment window (Phase 2 attribution already handles them).
|
||
2. Time-cluster: serial + 7-day gap is the cluster identity.
|
||
3. Metadata-split: split on persistent (>= 2 events) metadata transitions.
|
||
4. Match against existing graph (rapidfuzz multi-signal scoring).
|
||
5. Score confidence (high/medium/low).
|
||
6. Detect conflicts (overlap with existing UnitAssignment at different
|
||
location for the same serial → blocking).
|
||
7. Apply: create Project / MonitoringLocation / UnitAssignment +
|
||
UnitHistory audit row, all in one transaction.
|
||
|
||
Public API:
|
||
scan_clusters_and_build_suggestions(db, sfm_base_url) → ScanResult
|
||
apply_suggestions(db, cluster_ids, *, decided_by) → ApplyResult
|
||
skip_clusters(db, cluster_ids, *, decided_by) → int
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import hashlib
|
||
import logging
|
||
import os
|
||
import re
|
||
import uuid
|
||
from collections import Counter
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, date, timedelta
|
||
from typing import Optional, Iterable, Literal
|
||
|
||
import httpx
|
||
import rapidfuzz
|
||
from sqlalchemy.orm import Session
|
||
|
||
from backend.models import (
|
||
Project,
|
||
ProjectModule,
|
||
MonitoringLocation,
|
||
UnitAssignment,
|
||
RosterUnit,
|
||
UnitHistory,
|
||
MetadataBackfillDecision,
|
||
)
|
||
|
||
log = logging.getLogger("backend.services.metadata_backfill")
|
||
|
||
|
||
# ── Tunables ──────────────────────────────────────────────────────────────────
|
||
CLUSTER_GAP_DAYS = 7 # time gap that splits a time-cluster
|
||
MIN_SPLIT_RUN_LENGTH = 2 # min consecutive events to trigger meta-split
|
||
FUZZY_EXACT_THRESHOLD = 0.95 # WRatio score → treated as exact match
|
||
FUZZY_MATCH_THRESHOLD = 0.80 # WRatio score → treated as fuzzy match
|
||
FUZZY_AMBIGUITY_DELTA = 0.05 # if 2nd-best score is within this of 1st → ambiguous
|
||
SUSPICIOUS_SPAN_DAYS = 90 # cluster spanning > this with sparse events → suspicious
|
||
RECENT_CLUSTER_DAYS = 3 # if cluster end is within this many days of now → leave assigned_until=NULL
|
||
|
||
SFM_FETCH_CEILING = 5000 # max events per SFM /db/events call
|
||
SFM_TIMEOUT = 30.0 # generous; this runs in the background
|
||
|
||
# ProjectType to assign to auto-created projects. We use a sentinel
|
||
# "auto_imported" type that the parser ensures exists. Operator can re-type
|
||
# them later by editing the Project.
|
||
AUTO_IMPORTED_TYPE_ID = "auto_imported"
|
||
AUTO_IMPORTED_TYPE_NAME = "Auto-imported (from event metadata)"
|
||
|
||
|
||
# ── Normalisation + fuzzy matching ─────────────────────────────────────────────
|
||
|
||
|
||
def _normalise(s: Optional[str]) -> str:
|
||
"""Lowercase, replace internal punctuation with spaces, collapse spaces.
|
||
|
||
Aggressive enough that 'Test-5-8-26' and 'Test 5/8/26' produce the
|
||
same normalised string ('test 5 8 26'), but preserves alphanumeric
|
||
content so 'I-80 N Fork' doesn't lose its content (becomes 'i 80 n fork').
|
||
"""
|
||
if not s:
|
||
return ""
|
||
s = s.strip().lower()
|
||
# Replace any non-alphanumeric character with a space (preserves the
|
||
# tokens, just normalises the separators).
|
||
s = re.sub(r"[^a-z0-9]+", " ", s)
|
||
s = re.sub(r"\s+", " ", s).strip()
|
||
return s
|
||
|
||
|
||
# Match a "Loc N" / "Location #N" suffix preceded by a separator. Operators
|
||
# often type project names like "Fay - Locks & Dam No3 - Loc 2 - 735 Bunola"
|
||
# where the leading "Fay - Locks & Dam No3" is the actual project and the
|
||
# trailing "- Loc 2 - ..." is location info that already lives in the
|
||
# sensor_location field. We strip the trailing junk so projects with the
|
||
# same root get clustered together.
|
||
#
|
||
# Matches:
|
||
# "- Loc 2", "-Loc3", "- Location #5", " — Location.5", "- LOC #07"
|
||
# Doesn't match strings without an obvious Loc N marker — those keep
|
||
# their full project_raw and the operator can edit them in the wizard.
|
||
_PROJECT_LOC_SUFFIX = re.compile(
|
||
r"""
|
||
\s* # any leading whitespace
|
||
[-–—] # hyphen or em-dash (separator before the Loc marker)
|
||
\s* # optional spaces
|
||
(?:loc|location) # 'Loc' or 'Location'
|
||
\.? # optional period
|
||
\s* # optional space
|
||
\#? # optional '#'
|
||
\s* # optional space
|
||
\d+ # required digit
|
||
\b # word boundary
|
||
""",
|
||
re.IGNORECASE | re.VERBOSE,
|
||
)
|
||
|
||
|
||
def _extract_project_root(project_raw: str) -> str:
|
||
"""Return the leading 'project root' portion of an operator-typed string.
|
||
|
||
Strips everything from the first " - Loc N" (or similar) marker forward,
|
||
so 'Fay - Locks & Dam No3 - Loc 2 - 735 Bunola' becomes
|
||
'Fay - Locks & Dam No3'. Strings without a Loc-marker pass through
|
||
unchanged.
|
||
|
||
Trailing whitespace and dangling hyphens are cleaned up.
|
||
"""
|
||
if not project_raw:
|
||
return ""
|
||
m = _PROJECT_LOC_SUFFIX.search(project_raw)
|
||
if m is None:
|
||
return project_raw.strip()
|
||
root = project_raw[: m.start()]
|
||
# Strip trailing whitespace + dangling separators left behind
|
||
# (e.g. "Fay - Locks & Dam No3 -" → "Fay - Locks & Dam No3").
|
||
root = re.sub(r"[\s\-–—]+$", "", root)
|
||
return root.strip()
|
||
|
||
|
||
# Min length of the SHORTER input before a fuzzy match is accepted.
|
||
# rapidfuzz.WRatio is generous with partial_ratio on short strings — e.g.
|
||
# 'demo' vs 'bridge demo project' scores 0.90 (false positive). Requiring
|
||
# the shorter input be >= 5 chars filters out those degenerate cases.
|
||
_MIN_FUZZY_LEN = 5
|
||
|
||
|
||
def similarity(a: str, b: str) -> float:
|
||
"""Multi-signal similarity score in [0.0, 1.0] via rapidfuzz.WRatio.
|
||
|
||
Blends Levenshtein, partial-substring, token-sort, token-set
|
||
scoring. Handles abbreviations ('N' vs 'North'), word reordering,
|
||
and substring containment.
|
||
|
||
Returns 0.0 if either input is empty OR if the shorter input is
|
||
too short to fuzzy-match safely (see _MIN_FUZZY_LEN comment) AND the
|
||
strings don't exact-match. This guardrails the 'one common word
|
||
inside a longer phrase' false positive.
|
||
"""
|
||
if not a or not b:
|
||
return 0.0
|
||
if a == b:
|
||
return 1.0
|
||
if min(len(a), len(b)) < _MIN_FUZZY_LEN:
|
||
return 0.0
|
||
return rapidfuzz.fuzz.WRatio(a, b) / 100.0
|
||
|
||
|
||
# ── Cluster + Suggestion dataclasses ───────────────────────────────────────────
|
||
|
||
|
||
@dataclass
|
||
class Cluster:
|
||
cluster_id: str
|
||
serial: str
|
||
first_event_ts: datetime
|
||
last_event_ts: datetime
|
||
event_count: int
|
||
sample_event_id: str
|
||
|
||
# project_raw is the FULL operator-typed string (e.g.
|
||
# "Fay - Locks & Dam No3 - Loc 5 Synthomer"). Kept for display so
|
||
# operator can sanity-check what they typed.
|
||
project_raw: str
|
||
# project_root is project_raw with any trailing "- Loc N" suffix
|
||
# stripped — what we actually use for matching and as the suggested
|
||
# project name. (e.g. "Fay - Locks & Dam No3"). Same as project_raw
|
||
# if no Loc marker was found.
|
||
project_root: str
|
||
project_norm: str
|
||
|
||
location_raw: str
|
||
location_norm: str
|
||
client_raw: str
|
||
operator_raw: str
|
||
|
||
is_blank_meta: bool
|
||
metadata_consistency: float # 0.0–1.0
|
||
|
||
|
||
@dataclass
|
||
class ConflictHint:
|
||
existing_assignment_id: str
|
||
other_location_id: str
|
||
other_location_name: str
|
||
other_project_id: str
|
||
other_project_name: str
|
||
|
||
|
||
@dataclass
|
||
class Suggestion:
|
||
cluster: Cluster
|
||
project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"]
|
||
project_existing_id: Optional[str]
|
||
project_existing_name: Optional[str]
|
||
project_match_score: Optional[float]
|
||
project_suggested_name: str
|
||
|
||
location_match: Literal["exact", "fuzzy", "create_new"]
|
||
location_existing_id: Optional[str]
|
||
location_existing_name: Optional[str]
|
||
location_match_score: Optional[float]
|
||
location_suggested_name: str
|
||
|
||
proposed_assigned_at: datetime
|
||
proposed_assigned_until: Optional[datetime] # None = active / open-ended
|
||
|
||
confidence: Literal["high", "medium", "low"]
|
||
conflicts: list[ConflictHint] = field(default_factory=list)
|
||
blocking_conflict: bool = False
|
||
already_attributed: bool = False
|
||
|
||
|
||
@dataclass
|
||
class ScanResult:
|
||
suggestions: list[Suggestion]
|
||
skipped_orphans: int # clusters skipped because pre-existing skip decision
|
||
already_attributed: int # clusters with full pre-existing assignment overlap
|
||
scanned_event_count: int
|
||
cluster_count: int
|
||
|
||
|
||
@dataclass
|
||
class ApplyResult:
|
||
applied: int
|
||
skipped: int
|
||
failed: list[tuple[str, str]] # (cluster_id, reason)
|
||
project_ids_created: list[str]
|
||
location_ids_created: list[str]
|
||
assignment_ids_created: list[str]
|
||
|
||
|
||
# ── Step 1+2+3: clustering ────────────────────────────────────────────────────
|
||
|
||
|
||
def _build_cluster_id(serial: str, first_ts: datetime, last_ts: datetime) -> str:
|
||
"""Deterministic SHA1 hash of (serial, first_date, last_date).
|
||
|
||
Stable across re-scans of the same data — typo-corrected events
|
||
don't change the cluster_id.
|
||
"""
|
||
key = f"{serial}|{first_ts.date().isoformat()}|{last_ts.date().isoformat()}"
|
||
return hashlib.sha1(key.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def _events_overlap_existing_assignment(
|
||
ev_ts: datetime,
|
||
serial: str,
|
||
assignments_by_serial: dict[str, list[UnitAssignment]],
|
||
now: datetime,
|
||
) -> bool:
|
||
"""Does this event timestamp fall inside any existing UnitAssignment
|
||
window for this serial?"""
|
||
for a in assignments_by_serial.get(serial, []):
|
||
a_end = a.assigned_until or now
|
||
if a.assigned_at <= ev_ts <= a_end:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _mode_string(values: Iterable[Optional[str]]) -> tuple[str, float]:
|
||
"""Return (most_common_value, consistency_fraction).
|
||
|
||
Treats None/empty as a single "blank" bucket. consistency_fraction
|
||
is fraction of inputs equal to the modal value.
|
||
"""
|
||
vals = [v if v else "" for v in values]
|
||
if not vals:
|
||
return "", 1.0
|
||
counts = Counter(vals)
|
||
mode_value, mode_count = counts.most_common(1)[0]
|
||
return mode_value, mode_count / len(vals)
|
||
|
||
|
||
def _split_by_metadata_runs(events: list[dict]) -> list[list[dict]]:
|
||
"""Run-length-encode (project_norm, location_norm) sequence; drop runs
|
||
shorter than MIN_SPLIT_RUN_LENGTH; return list of sub-cluster event lists.
|
||
|
||
Single-event metadata blips (typos) are merged back into the surrounding
|
||
cluster's modal metadata.
|
||
"""
|
||
if not events:
|
||
return []
|
||
|
||
# Build (key, idx) pairs. Blanks count as the "previous" key for splitting
|
||
# purposes — i.e., a blank event doesn't fork the cluster.
|
||
def _key(ev: dict) -> tuple[str, str]:
|
||
return (_normalise(ev.get("project")), _normalise(ev.get("sensor_location")))
|
||
|
||
# Identify runs.
|
||
runs: list[list[int]] = [] # each run is a list of event-indices
|
||
last_key: Optional[tuple[str, str]] = None
|
||
current_run: list[int] = []
|
||
blank = ("", "")
|
||
for i, ev in enumerate(events):
|
||
k = _key(ev)
|
||
if k == blank:
|
||
# Blank events inherit the previous run (or start a blank run if
|
||
# they're at the beginning).
|
||
current_run.append(i)
|
||
continue
|
||
if last_key is None or k == last_key:
|
||
current_run.append(i)
|
||
last_key = k
|
||
else:
|
||
runs.append(current_run)
|
||
current_run = [i]
|
||
last_key = k
|
||
if current_run:
|
||
runs.append(current_run)
|
||
|
||
# Filter out short runs (typos / one-off blips). Their events are folded
|
||
# back into the run that follows or precedes them, whichever is longer.
|
||
def _run_key(run: list[int]) -> tuple[str, str]:
|
||
for idx in run:
|
||
k = _key(events[idx])
|
||
if k != blank:
|
||
return k
|
||
return blank
|
||
|
||
filtered: list[list[int]] = []
|
||
pending_short: list[int] = []
|
||
for run in runs:
|
||
if len(run) >= MIN_SPLIT_RUN_LENGTH:
|
||
# Fold any pending-short events into this run's front.
|
||
run = pending_short + run
|
||
pending_short = []
|
||
filtered.append(run)
|
||
else:
|
||
pending_short.extend(run)
|
||
|
||
# Any trailing pending-short get appended to the previous filtered run,
|
||
# or become their own run if nothing came before.
|
||
if pending_short:
|
||
if filtered:
|
||
filtered[-1].extend(pending_short)
|
||
else:
|
||
filtered.append(pending_short)
|
||
|
||
return [[events[i] for i in run] for run in filtered]
|
||
|
||
|
||
def _events_to_clusters(
|
||
serial: str,
|
||
events: list[dict], # already filtered: assignments_by_serial overlap dropped
|
||
) -> list[Cluster]:
|
||
"""Sort by timestamp, time-cluster on CLUSTER_GAP_DAYS, then metadata-split.
|
||
|
||
Each output cluster has its modal metadata computed across its events.
|
||
"""
|
||
if not events:
|
||
return []
|
||
|
||
# Sort by timestamp ascending. (Caller has already dropped events with
|
||
# blank timestamps; guard anyway.)
|
||
events = [e for e in events if e.get("timestamp")]
|
||
events = sorted(events, key=lambda e: e["timestamp"])
|
||
if not events:
|
||
return []
|
||
|
||
# Time-cluster.
|
||
gap = timedelta(days=CLUSTER_GAP_DAYS)
|
||
time_clusters: list[list[dict]] = []
|
||
current: list[dict] = []
|
||
last_ts: Optional[datetime] = None
|
||
for ev in events:
|
||
ts = _parse_ts(ev["timestamp"])
|
||
if last_ts is None or (ts - last_ts) <= gap:
|
||
current.append(ev)
|
||
else:
|
||
time_clusters.append(current)
|
||
current = [ev]
|
||
last_ts = ts
|
||
if current:
|
||
time_clusters.append(current)
|
||
|
||
# Metadata-split each time-cluster.
|
||
out: list[Cluster] = []
|
||
for tc in time_clusters:
|
||
sub_clusters = _split_by_metadata_runs(tc)
|
||
for sc in sub_clusters:
|
||
cluster = _build_cluster(serial, sc)
|
||
out.append(cluster)
|
||
return out
|
||
|
||
|
||
def _parse_ts(ts: str) -> datetime:
|
||
"""Parse SFM-returned timestamp string into datetime."""
|
||
# SFM returns ISO 8601 with 'T' separator, sometimes with offset.
|
||
s = ts.replace("Z", "+00:00")
|
||
try:
|
||
dt = datetime.fromisoformat(s)
|
||
except ValueError:
|
||
# Fallback: space separator
|
||
dt = datetime.fromisoformat(s.replace(" ", "T"))
|
||
# Strip tzinfo for consistency with terra-view's naive UTC datetimes.
|
||
if dt.tzinfo is not None:
|
||
dt = dt.replace(tzinfo=None)
|
||
return dt
|
||
|
||
|
||
def _build_cluster(serial: str, events: list[dict]) -> Cluster:
|
||
"""Compute a Cluster from a list of events that already belong together."""
|
||
timestamps = [_parse_ts(ev["timestamp"]) for ev in events]
|
||
first_ts = min(timestamps)
|
||
last_ts = max(timestamps)
|
||
|
||
project_mode_norm, project_consistency = _mode_string(_normalise(ev.get("project")) for ev in events)
|
||
location_mode_norm, location_consistency = _mode_string(_normalise(ev.get("sensor_location")) for ev in events)
|
||
|
||
# For display, pick the most common RAW value (case-preserved) that
|
||
# normalises to the modal normalised value.
|
||
def _pick_display(field: str, mode_norm: str) -> str:
|
||
for ev in events:
|
||
v = (ev.get(field) or "").strip()
|
||
if _normalise(v) == mode_norm and v:
|
||
return v
|
||
return ""
|
||
|
||
project_raw = _pick_display("project", project_mode_norm)
|
||
location_raw = _pick_display("sensor_location", location_mode_norm)
|
||
client_raw = _pick_display("client", _normalise(events[0].get("client")))
|
||
operator_raw = _pick_display("operator", _normalise(events[0].get("operator")))
|
||
|
||
# Strip trailing "- Loc N" location info that operators sometimes bake
|
||
# into the project string for email-readability ("I-80 - Loc 2 - 543 W
|
||
# Plant Rd" → "I-80"). The sensor_location field already has the
|
||
# authoritative location identifier. Use project_root for matching
|
||
# and as the suggested project name; keep project_raw for display.
|
||
project_root = _extract_project_root(project_raw)
|
||
project_norm_for_matching = _normalise(project_root)
|
||
|
||
consistency = min(project_consistency, location_consistency)
|
||
is_blank = (not project_norm_for_matching) or (not location_mode_norm)
|
||
|
||
return Cluster(
|
||
cluster_id = _build_cluster_id(serial, first_ts, last_ts),
|
||
serial = serial,
|
||
first_event_ts = first_ts,
|
||
last_event_ts = last_ts,
|
||
event_count = len(events),
|
||
sample_event_id = events[0]["id"],
|
||
project_raw = project_raw,
|
||
project_root = project_root,
|
||
project_norm = project_norm_for_matching,
|
||
location_raw = location_raw,
|
||
location_norm = location_mode_norm,
|
||
client_raw = client_raw,
|
||
operator_raw = operator_raw,
|
||
is_blank_meta = is_blank,
|
||
metadata_consistency = consistency,
|
||
)
|
||
|
||
|
||
# ── Step 4: SFM fetch ─────────────────────────────────────────────────────────
|
||
|
||
|
||
async def _fetch_all_events_from_sfm(sfm_base_url: str) -> list[dict]:
|
||
"""Pull every event from SFM. Currently the only filter we can use is
|
||
serial-based, so we have to iterate over known serials. Practically this
|
||
means: get the list of units from /db/units, then fetch /db/events per
|
||
serial. ~26 calls for the dev box, manageable for prod (~50-100 serials).
|
||
"""
|
||
async with httpx.AsyncClient(timeout=SFM_TIMEOUT) as client:
|
||
units_resp = await client.get(f"{sfm_base_url}/db/units")
|
||
units_resp.raise_for_status()
|
||
units = units_resp.json()
|
||
serials = [u["serial"] for u in units if u.get("serial")]
|
||
|
||
async def _fetch_one(serial: str) -> list[dict]:
|
||
try:
|
||
r = await client.get(
|
||
f"{sfm_base_url}/db/events",
|
||
params={"serial": serial, "limit": SFM_FETCH_CEILING},
|
||
)
|
||
r.raise_for_status()
|
||
payload = r.json()
|
||
# Strip waveform_blob (large bytes; we don't need them)
|
||
evs = payload.get("events", []) or []
|
||
for ev in evs:
|
||
ev.pop("waveform_blob", None)
|
||
ev.pop("a5_pickle_filename", None)
|
||
return evs
|
||
except httpx.HTTPError as e:
|
||
log.warning("SFM fetch failed for serial=%s: %s", serial, e)
|
||
return []
|
||
|
||
all_events_nested = await asyncio.gather(*[_fetch_one(s) for s in serials])
|
||
|
||
all_events: list[dict] = []
|
||
for batch in all_events_nested:
|
||
all_events.extend(batch)
|
||
return all_events
|
||
|
||
|
||
# ── Step 4 cont.: cluster building from SFM ───────────────────────────────────
|
||
|
||
|
||
async def _scan_clusters(
|
||
db: Session,
|
||
sfm_base_url: str,
|
||
) -> tuple[list[Cluster], int, int]:
|
||
"""Pull SFM events, pre-filter, time-cluster, metadata-split.
|
||
|
||
Returns (clusters, scanned_event_count, already_attributed_event_count).
|
||
"""
|
||
events = await _fetch_all_events_from_sfm(sfm_base_url)
|
||
scanned = len(events)
|
||
|
||
# Load all existing UnitAssignments once, group by serial.
|
||
all_assignments = db.query(UnitAssignment).all()
|
||
assignments_by_serial: dict[str, list[UnitAssignment]] = {}
|
||
for a in all_assignments:
|
||
assignments_by_serial.setdefault(a.unit_id, []).append(a)
|
||
|
||
now = datetime.utcnow()
|
||
|
||
# Bucket events by serial.
|
||
by_serial: dict[str, list[dict]] = {}
|
||
already_attributed = 0
|
||
for ev in events:
|
||
serial = ev.get("serial")
|
||
if not serial:
|
||
continue
|
||
ts_raw = ev.get("timestamp")
|
||
if not ts_raw:
|
||
# Skip events without a timestamp — they can't be clustered.
|
||
continue
|
||
ts = _parse_ts(ts_raw)
|
||
if _events_overlap_existing_assignment(ts, serial, assignments_by_serial, now):
|
||
already_attributed += 1
|
||
continue
|
||
by_serial.setdefault(serial, []).append(ev)
|
||
|
||
# Cluster per serial.
|
||
clusters: list[Cluster] = []
|
||
for serial, serial_events in by_serial.items():
|
||
clusters.extend(_events_to_clusters(serial, serial_events))
|
||
|
||
return clusters, scanned, already_attributed
|
||
|
||
|
||
# ── Step 5: matching against the existing graph ───────────────────────────────
|
||
|
||
|
||
def _find_best_match(
|
||
candidate_norm: str,
|
||
candidates: list[tuple[str, str]], # (id, normalised_name)
|
||
) -> tuple[Optional[str], Optional[float], str]:
|
||
"""Return (best_id, best_score, classification).
|
||
|
||
classification ∈ {"exact", "fuzzy", "ambiguous", "no_match"}
|
||
"""
|
||
if not candidate_norm or not candidates:
|
||
return None, None, "no_match"
|
||
|
||
scored = [(cid, similarity(candidate_norm, cnorm)) for cid, cnorm in candidates]
|
||
scored.sort(key=lambda x: x[1], reverse=True)
|
||
best_id, best_score = scored[0]
|
||
|
||
# Check for ambiguity: 2nd-best within FUZZY_AMBIGUITY_DELTA of best,
|
||
# and both above the match threshold.
|
||
if len(scored) > 1:
|
||
_, second_score = scored[1]
|
||
if best_score >= FUZZY_MATCH_THRESHOLD and second_score >= FUZZY_MATCH_THRESHOLD \
|
||
and (best_score - second_score) < FUZZY_AMBIGUITY_DELTA:
|
||
return best_id, best_score, "ambiguous"
|
||
|
||
if best_score >= FUZZY_EXACT_THRESHOLD:
|
||
return best_id, best_score, "exact"
|
||
if best_score >= FUZZY_MATCH_THRESHOLD:
|
||
return best_id, best_score, "fuzzy"
|
||
return None, best_score, "no_match"
|
||
|
||
|
||
def _detect_conflicts(
|
||
db: Session,
|
||
cluster: Cluster,
|
||
target_location_id: Optional[str],
|
||
proposed_assigned_at: datetime,
|
||
proposed_assigned_until: Optional[datetime],
|
||
) -> tuple[list[ConflictHint], bool, bool]:
|
||
"""Return (conflicts, blocking, already_attributed_at_target).
|
||
|
||
already_attributed_at_target = True means an existing UnitAssignment for
|
||
THIS serial is already at THIS target location during this window —
|
||
nothing to do, skip the cluster.
|
||
"""
|
||
now = datetime.utcnow()
|
||
end = proposed_assigned_until or now
|
||
|
||
overlapping = (
|
||
db.query(UnitAssignment)
|
||
.filter(UnitAssignment.unit_id == cluster.serial)
|
||
.all()
|
||
)
|
||
conflicts: list[ConflictHint] = []
|
||
already_at_target = False
|
||
blocking = False
|
||
|
||
for a in overlapping:
|
||
a_end = a.assigned_until or now
|
||
if proposed_assigned_at < a_end and end > a.assigned_at:
|
||
# Overlapping window.
|
||
if target_location_id is not None and a.location_id == target_location_id:
|
||
already_at_target = True
|
||
continue
|
||
# Different location → blocking conflict.
|
||
loc = db.query(MonitoringLocation).filter_by(id=a.location_id).first()
|
||
proj = db.query(Project).filter_by(id=a.project_id).first()
|
||
conflicts.append(ConflictHint(
|
||
existing_assignment_id = a.id,
|
||
other_location_id = a.location_id,
|
||
other_location_name = loc.name if loc else "?",
|
||
other_project_id = a.project_id,
|
||
other_project_name = proj.name if proj else "?",
|
||
))
|
||
blocking = True
|
||
|
||
return conflicts, blocking, already_at_target
|
||
|
||
|
||
def _score_confidence(
|
||
cluster: Cluster,
|
||
project_match: str,
|
||
location_match: str,
|
||
conflicts: list[ConflictHint],
|
||
) -> str:
|
||
"""Return 'high' | 'medium' | 'low'. See plan for full rules."""
|
||
if conflicts:
|
||
return "low"
|
||
if cluster.is_blank_meta:
|
||
return "low"
|
||
if cluster.event_count < 2:
|
||
return "low"
|
||
|
||
span_days = (cluster.last_event_ts - cluster.first_event_ts).days
|
||
if span_days > SUSPICIOUS_SPAN_DAYS and cluster.event_count < (span_days / 7):
|
||
# Cluster spans > 90 days but emits less than 1 event/week — sparse.
|
||
return "low"
|
||
|
||
if project_match == "ambiguous" or location_match == "ambiguous":
|
||
return "low"
|
||
|
||
if project_match == "fuzzy" or location_match == "fuzzy":
|
||
return "medium"
|
||
|
||
# Exact match on both, OR clean create_new on both with good event count
|
||
# and non-blank metadata.
|
||
if (project_match == "exact" and location_match == "exact"):
|
||
return "high"
|
||
if (project_match == "create_new" and location_match == "create_new"):
|
||
return "high"
|
||
# Mixed: one exact, one create_new → medium (probably means location
|
||
# is new under an existing project, which is common but worth review)
|
||
return "medium"
|
||
|
||
|
||
def _build_suggestion(db: Session, cluster: Cluster) -> Suggestion:
|
||
"""Match cluster against the existing graph, score, detect conflicts."""
|
||
|
||
# Match project.
|
||
existing_projects = db.query(Project).filter(Project.status != "deleted").all()
|
||
project_candidates = [(p.id, _normalise(p.name)) for p in existing_projects]
|
||
if cluster.project_norm:
|
||
proj_id, proj_score, proj_match = _find_best_match(cluster.project_norm, project_candidates)
|
||
else:
|
||
proj_id, proj_score, proj_match = None, None, "create_new"
|
||
|
||
project_existing = None
|
||
project_match: Literal["exact", "fuzzy", "create_new", "ambiguous"]
|
||
if proj_match == "exact":
|
||
project_match = "exact"
|
||
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
|
||
elif proj_match == "fuzzy":
|
||
project_match = "fuzzy"
|
||
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
|
||
elif proj_match == "ambiguous":
|
||
project_match = "ambiguous"
|
||
project_existing = next((p for p in existing_projects if p.id == proj_id), None)
|
||
else:
|
||
project_match = "create_new"
|
||
|
||
project_suggested_name = (
|
||
project_existing.name if project_existing and project_match == "exact"
|
||
else cluster.project_root or cluster.project_raw or f"Project {cluster.serial}"
|
||
)
|
||
|
||
# Match location ONLY within the matched project's existing locations.
|
||
# If we're creating a new project, every location is also new (location
|
||
# names should not be fuzzy-matched across project boundaries — "Loc 1"
|
||
# at Project A is a different thing than "Loc 1" at Project B).
|
||
if project_existing and project_match in ("exact", "fuzzy"):
|
||
location_candidates_objs = (
|
||
db.query(MonitoringLocation)
|
||
.filter(MonitoringLocation.project_id == project_existing.id)
|
||
.filter(MonitoringLocation.location_type == "vibration")
|
||
.all()
|
||
)
|
||
location_candidates = [(l.id, _normalise(l.name)) for l in location_candidates_objs]
|
||
if cluster.location_norm:
|
||
loc_id, loc_score, loc_match = _find_best_match(cluster.location_norm, location_candidates)
|
||
else:
|
||
loc_id, loc_score, loc_match = None, None, "create_new"
|
||
else:
|
||
# Project will be created new → location is automatically new.
|
||
location_candidates_objs = []
|
||
loc_id, loc_score, loc_match = None, None, "create_new"
|
||
|
||
location_existing = None
|
||
location_match: Literal["exact", "fuzzy", "create_new"]
|
||
if loc_match == "exact":
|
||
location_match = "exact"
|
||
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
|
||
elif loc_match == "fuzzy":
|
||
location_match = "fuzzy"
|
||
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
|
||
elif loc_match == "ambiguous":
|
||
# We treat ambiguous-location as fuzzy for now; the operator can
|
||
# pick which one (out of scope: a richer disambiguation UI).
|
||
location_match = "fuzzy"
|
||
location_existing = next((l for l in location_candidates_objs if l.id == loc_id), None)
|
||
else:
|
||
location_match = "create_new"
|
||
|
||
location_suggested_name = (
|
||
location_existing.name if location_existing and location_match == "exact"
|
||
else cluster.location_raw or "Unnamed location"
|
||
)
|
||
|
||
# Proposed assignment window.
|
||
proposed_at = cluster.first_event_ts - timedelta(hours=1)
|
||
now = datetime.utcnow()
|
||
if (now - cluster.last_event_ts) <= timedelta(days=RECENT_CLUSTER_DAYS):
|
||
proposed_until = None # Treat as active
|
||
else:
|
||
proposed_until = cluster.last_event_ts + timedelta(hours=1)
|
||
|
||
# Conflict detection vs existing UnitAssignments.
|
||
target_loc_id = location_existing.id if location_existing else None
|
||
conflicts, blocking, already_attributed = _detect_conflicts(
|
||
db, cluster, target_loc_id, proposed_at, proposed_until,
|
||
)
|
||
|
||
confidence = _score_confidence(cluster, project_match, location_match, conflicts)
|
||
|
||
return Suggestion(
|
||
cluster = cluster,
|
||
project_match = project_match,
|
||
project_existing_id = project_existing.id if project_existing else None,
|
||
project_existing_name = project_existing.name if project_existing else None,
|
||
project_match_score = proj_score,
|
||
project_suggested_name = project_suggested_name,
|
||
location_match = location_match,
|
||
location_existing_id = location_existing.id if location_existing else None,
|
||
location_existing_name = location_existing.name if location_existing else None,
|
||
location_match_score = loc_score,
|
||
location_suggested_name = location_suggested_name,
|
||
proposed_assigned_at = proposed_at,
|
||
proposed_assigned_until = proposed_until,
|
||
confidence = confidence,
|
||
conflicts = conflicts,
|
||
blocking_conflict = blocking,
|
||
already_attributed = already_attributed,
|
||
)
|
||
|
||
|
||
# ── Public API: scan ──────────────────────────────────────────────────────────
|
||
|
||
|
||
async def scan_clusters_and_build_suggestions(
|
||
db: Session,
|
||
sfm_base_url: str,
|
||
) -> ScanResult:
|
||
"""End-to-end scan: fetch SFM events, cluster, build suggestions.
|
||
|
||
Persists / updates a MetadataBackfillDecision row per non-already-attributed
|
||
cluster with status='pending' (first time) or refreshed last_seen_at.
|
||
"""
|
||
clusters, scanned, already_attributed_events = await _scan_clusters(db, sfm_base_url)
|
||
|
||
suggestions: list[Suggestion] = []
|
||
skipped_orphans = 0
|
||
already_attributed_clusters = 0
|
||
now = datetime.utcnow()
|
||
|
||
for cluster in clusters:
|
||
# Check existing decision.
|
||
decision = db.query(MetadataBackfillDecision).filter_by(
|
||
cluster_id=cluster.cluster_id
|
||
).first()
|
||
|
||
if decision and decision.status == "skipped":
|
||
skipped_orphans += 1
|
||
# Refresh last_seen so we know it's still around.
|
||
decision.last_seen_at = now
|
||
continue
|
||
if decision and decision.status == "applied":
|
||
# Already done. Phase 2 attribution should be picking up these
|
||
# events now.
|
||
decision.last_seen_at = now
|
||
continue
|
||
|
||
suggestion = _build_suggestion(db, cluster)
|
||
|
||
if suggestion.already_attributed:
|
||
already_attributed_clusters += 1
|
||
continue
|
||
|
||
suggestions.append(suggestion)
|
||
|
||
# Upsert decision row.
|
||
if decision is None:
|
||
db.add(MetadataBackfillDecision(
|
||
cluster_id = cluster.cluster_id,
|
||
status = "conflict" if suggestion.blocking_conflict else "pending",
|
||
confidence = suggestion.confidence,
|
||
first_seen_at = now,
|
||
last_seen_at = now,
|
||
serial = cluster.serial,
|
||
project_raw = cluster.project_raw or None,
|
||
location_raw = cluster.location_raw or None,
|
||
first_event_ts = cluster.first_event_ts,
|
||
last_event_ts = cluster.last_event_ts,
|
||
event_count = cluster.event_count,
|
||
))
|
||
else:
|
||
decision.status = "conflict" if suggestion.blocking_conflict else "pending"
|
||
decision.confidence = suggestion.confidence
|
||
decision.last_seen_at = now
|
||
decision.event_count = cluster.event_count
|
||
decision.last_event_ts = cluster.last_event_ts
|
||
|
||
db.commit()
|
||
|
||
return ScanResult(
|
||
suggestions = suggestions,
|
||
skipped_orphans = skipped_orphans,
|
||
already_attributed = already_attributed_clusters,
|
||
scanned_event_count = scanned,
|
||
cluster_count = len(clusters),
|
||
)
|
||
|
||
|
||
# ── Public API: apply ─────────────────────────────────────────────────────────
|
||
|
||
|
||
def _ensure_auto_imported_project_type(db: Session) -> str:
|
||
"""Create the 'auto_imported' ProjectType if it doesn't exist. Returns id."""
|
||
from backend.models import ProjectType
|
||
pt = db.query(ProjectType).filter_by(id=AUTO_IMPORTED_TYPE_ID).first()
|
||
if pt is None:
|
||
pt = ProjectType(
|
||
id = AUTO_IMPORTED_TYPE_ID,
|
||
name = AUTO_IMPORTED_TYPE_NAME,
|
||
description = "Projects created automatically by the metadata-backfill parser. Operators can re-type them later.",
|
||
supports_vibration = True,
|
||
supports_sound = False,
|
||
)
|
||
db.add(pt)
|
||
db.flush()
|
||
return pt.id
|
||
|
||
|
||
def _ensure_project(db: Session, suggestion: Suggestion) -> tuple[Project, bool]:
|
||
"""Return (project, created_flag).
|
||
|
||
Dedup is normalisation-aware: "SR81" and "SR 81" collapse to the same
|
||
project (both normalise to "sr 81"), as do "Fay - Locks & Dam No3"
|
||
and "Fay-Locks-&-Dam-No3". Important when applying many clusters in
|
||
one bulk operation — the first creates the project, subsequent
|
||
clusters with normalisation-equivalent names attach to it instead
|
||
of triggering a UNIQUE constraint violation.
|
||
"""
|
||
if suggestion.project_existing_id:
|
||
p = db.query(Project).filter_by(id=suggestion.project_existing_id).first()
|
||
if p is not None:
|
||
return p, False
|
||
|
||
candidate_name = suggestion.project_suggested_name.strip() or f"Auto-imported project ({suggestion.cluster.serial})"
|
||
candidate_norm = _normalise(candidate_name)
|
||
|
||
# Pre-flight normalised lookup: avoids creating duplicates that
|
||
# differ only in punctuation/spacing.
|
||
if candidate_norm:
|
||
for p in db.query(Project).filter(Project.status != "deleted").all():
|
||
if _normalise(p.name) == candidate_norm:
|
||
return p, False
|
||
|
||
# Final fallback: case-insensitive exact (cheap, catches the same
|
||
# things normalised lookup would but it's harmless to keep).
|
||
existing = db.query(Project).filter(Project.name.ilike(candidate_name)).first()
|
||
if existing is not None:
|
||
return existing, False
|
||
|
||
type_id = _ensure_auto_imported_project_type(db)
|
||
|
||
# Derive dates.
|
||
start = suggestion.cluster.first_event_ts.date()
|
||
end = (suggestion.cluster.last_event_ts.date()
|
||
if suggestion.proposed_assigned_until else None)
|
||
|
||
p = Project(
|
||
id = str(uuid.uuid4()),
|
||
name = candidate_name,
|
||
description = (
|
||
f"Auto-created by the metadata-backfill parser on "
|
||
f"{datetime.utcnow():%Y-%m-%d}. Sourced from operator-typed "
|
||
f"BW metadata across {suggestion.cluster.event_count} event(s) "
|
||
f"from serial {suggestion.cluster.serial}."
|
||
),
|
||
project_type_id = type_id,
|
||
status = "active",
|
||
data_collection_mode = "remote",
|
||
client_name = suggestion.cluster.client_raw or None,
|
||
start_date = start,
|
||
end_date = end,
|
||
)
|
||
db.add(p)
|
||
db.flush()
|
||
|
||
# Ensure the vibration_monitoring module is enabled.
|
||
pm = ProjectModule(
|
||
id = str(uuid.uuid4()),
|
||
project_id = p.id,
|
||
module_type = "vibration_monitoring",
|
||
enabled = True,
|
||
)
|
||
db.add(pm)
|
||
db.flush()
|
||
|
||
return p, True
|
||
|
||
|
||
def _ensure_location(
|
||
db: Session,
|
||
project: Project,
|
||
suggestion: Suggestion,
|
||
) -> tuple[MonitoringLocation, bool]:
|
||
"""Return (location, created_flag)."""
|
||
if suggestion.location_existing_id:
|
||
l = db.query(MonitoringLocation).filter_by(id=suggestion.location_existing_id).first()
|
||
if l is not None and l.project_id == project.id:
|
||
return l, False
|
||
|
||
candidate_name = suggestion.location_suggested_name.strip() or "Unnamed location"
|
||
candidate_norm = _normalise(candidate_name)
|
||
|
||
# Normalisation-aware lookup within this project — same dedup
|
||
# principle as _ensure_project.
|
||
if candidate_norm:
|
||
for existing in (
|
||
db.query(MonitoringLocation)
|
||
.filter(MonitoringLocation.project_id == project.id)
|
||
.all()
|
||
):
|
||
if _normalise(existing.name) == candidate_norm:
|
||
return existing, False
|
||
|
||
# Fallback to case-insensitive exact.
|
||
existing = (
|
||
db.query(MonitoringLocation)
|
||
.filter(MonitoringLocation.project_id == project.id)
|
||
.filter(MonitoringLocation.name.ilike(candidate_name))
|
||
.first()
|
||
)
|
||
if existing is not None:
|
||
return existing, False
|
||
|
||
l = MonitoringLocation(
|
||
id = str(uuid.uuid4()),
|
||
project_id = project.id,
|
||
location_type = "vibration",
|
||
name = candidate_name,
|
||
description = (
|
||
f"Auto-created by metadata-backfill from operator-typed sensor_location "
|
||
f"\"{suggestion.cluster.location_raw}\" on events from serial "
|
||
f"{suggestion.cluster.serial}."
|
||
),
|
||
)
|
||
db.add(l)
|
||
db.flush()
|
||
return l, True
|
||
|
||
|
||
def _apply_one(
|
||
db: Session,
|
||
suggestion: Suggestion,
|
||
*,
|
||
decided_by: str,
|
||
) -> tuple[str, str, str]:
|
||
"""Apply a single suggestion in a transaction.
|
||
|
||
Returns (project_id, location_id, assignment_id).
|
||
"""
|
||
project, _proj_created = _ensure_project(db, suggestion)
|
||
location, _loc_created = _ensure_location(db, project, suggestion)
|
||
|
||
# Create the UnitAssignment.
|
||
assignment_id = str(uuid.uuid4())
|
||
assignment = UnitAssignment(
|
||
id = assignment_id,
|
||
unit_id = suggestion.cluster.serial,
|
||
location_id = location.id,
|
||
project_id = project.id,
|
||
device_type = "seismograph",
|
||
assigned_at = suggestion.proposed_assigned_at,
|
||
assigned_until = suggestion.proposed_assigned_until,
|
||
status = "active" if suggestion.proposed_assigned_until is None else "completed",
|
||
source = "metadata_backfill",
|
||
notes = (
|
||
f"Auto-created from operator-typed metadata on "
|
||
f"{suggestion.cluster.event_count} event(s). "
|
||
f"Project: \"{suggestion.cluster.project_raw}\". "
|
||
f"Location: \"{suggestion.cluster.location_raw}\". "
|
||
f"Confidence: {suggestion.confidence}."
|
||
),
|
||
)
|
||
db.add(assignment)
|
||
|
||
# Audit log entry.
|
||
db.add(UnitHistory(
|
||
unit_id = suggestion.cluster.serial,
|
||
change_type = "assignment_backfilled",
|
||
field_name = "unit_assignment",
|
||
old_value = None,
|
||
new_value = f"{project.name} / {location.name}",
|
||
changed_at = datetime.utcnow(),
|
||
source = "metadata_backfill",
|
||
notes = (
|
||
f"Created from {suggestion.cluster.event_count} events tagged "
|
||
f"({suggestion.cluster.project_raw!r}, {suggestion.cluster.location_raw!r}). "
|
||
f"By: {decided_by}."
|
||
),
|
||
))
|
||
|
||
# Update the decision record.
|
||
decision = db.query(MetadataBackfillDecision).filter_by(
|
||
cluster_id=suggestion.cluster.cluster_id
|
||
).first()
|
||
if decision:
|
||
decision.status = "applied"
|
||
decision.decided_at = datetime.utcnow()
|
||
decision.decided_by = decided_by
|
||
decision.applied_assignment_id = assignment_id
|
||
|
||
return project.id, location.id, assignment_id
|
||
|
||
|
||
def apply_suggestions(
|
||
db: Session,
|
||
suggestions: list[Suggestion],
|
||
*,
|
||
decided_by: str,
|
||
) -> ApplyResult:
|
||
"""Apply a list of suggestions. Each suggestion is applied in its own
|
||
sub-transaction (via db.flush) — failures don't roll back successful ones.
|
||
"""
|
||
applied = 0
|
||
failed: list[tuple[str, str]] = []
|
||
proj_ids: list[str] = []
|
||
loc_ids: list[str] = []
|
||
asgn_ids: list[str] = []
|
||
|
||
for s in suggestions:
|
||
if s.blocking_conflict:
|
||
failed.append((s.cluster.cluster_id, "blocking conflict — needs manual resolution"))
|
||
continue
|
||
try:
|
||
p_id, l_id, a_id = _apply_one(db, s, decided_by=decided_by)
|
||
db.flush() # ensure FK consistency within transaction
|
||
proj_ids.append(p_id)
|
||
loc_ids.append(l_id)
|
||
asgn_ids.append(a_id)
|
||
applied += 1
|
||
except Exception as e:
|
||
log.exception("Failed to apply cluster %s", s.cluster.cluster_id)
|
||
db.rollback()
|
||
failed.append((s.cluster.cluster_id, str(e)))
|
||
|
||
db.commit()
|
||
|
||
return ApplyResult(
|
||
applied = applied,
|
||
skipped = 0,
|
||
failed = failed,
|
||
project_ids_created = list(dict.fromkeys(proj_ids)),
|
||
location_ids_created = list(dict.fromkeys(loc_ids)),
|
||
assignment_ids_created = asgn_ids,
|
||
)
|
||
|
||
|
||
def skip_clusters(
|
||
db: Session,
|
||
cluster_ids: list[str],
|
||
*,
|
||
decided_by: str = "operator",
|
||
) -> int:
|
||
"""Mark clusters as skipped so they don't reappear in future scans."""
|
||
now = datetime.utcnow()
|
||
n = 0
|
||
for cluster_id in cluster_ids:
|
||
decision = db.query(MetadataBackfillDecision).filter_by(cluster_id=cluster_id).first()
|
||
if decision is None:
|
||
continue
|
||
if decision.status not in ("pending", "conflict"):
|
||
continue
|
||
decision.status = "skipped"
|
||
decision.decided_at = now
|
||
decision.decided_by = decided_by
|
||
n += 1
|
||
db.commit()
|
||
return n
|