#!/usr/bin/env python3 """Phase-0 Workstream B3 — fuzzy entity-resolution tier (local Qwen). The deterministic tier (entity_resolution.py) merges only provable matches and FLAGS the hard name-variant candidates (same firm + surname, different first name/email) without guessing. This tier asks the local Qwen model (Spark Control /v1/chat/completions — sovereign, on Ten31 infra) to adjudicate each candidate and merges the confirmed ones. A merge repoints the loser's entity_links to the survivor and soft-deletes the loser canonical entity (deleted_at; never hard-deleted — guardrail #3). Every merge is written to the interaction_log (guardrail #5). Idempotent: re-running finds no new candidates once merged. python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db --dry-run """ import argparse import json import sqlite3 import uuid from datetime import datetime, timezone import entity_resolution as er import llm _SYSTEM = ("You are an entity-resolution assistant for a CRM. Decide if the listed " "people are the SAME individual recorded under name variants (e.g. nicknames " "like Kate/Katherine, Bill/William), or DIFFERENT people who happen to share a " "surname and firm. Be conservative: only say same when a nickname/abbreviation " "relationship or matching contact info makes it clear.") def _now(): return datetime.now(timezone.utc).isoformat() def _ask(members, firm): people = "; ".join(f"{n}" + (f" <{e}>" if e else "") for _, n, e in members) prompt = (f"Firm: {firm or 'unknown'}\nPeople: {people}\n\n" "Are these the SAME person under name variants? " 'Answer only JSON: {"same": true|false, "confidence": 0.0-1.0, "reason": "..."}') return llm.chat_json(prompt, system=_SYSTEM, max_tokens=160) or {"same": False, "confidence": 0.0} def _survivor(members): # Prefer a member with an email, then the longest (most complete) name. return sorted(members, key=lambda m: (bool(m[2]), len(m[1])), reverse=True)[0] def run(db, threshold=0.7, dry_run=False): counts, candidates = er.run(db) # ensure deterministic state + fresh candidates conn = sqlite3.connect(db) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys=ON") name_of = {r["id"]: r["display_name"] for r in conn.execute("SELECT id, display_name FROM canonical_entities")} merges = [] for cand in candidates: members = cand["members"] verdict = _ask(members, name_of.get(cand["org"])) same = bool(verdict.get("same")) and float(verdict.get("confidence", 0)) >= threshold decision = {"surname": cand["surname"], "firm": name_of.get(cand["org"]), "members": [{"id": m[0], "name": m[1]} for m in members], "same": same, "confidence": verdict.get("confidence"), "reason": verdict.get("reason")} if same: keep = _survivor(members) losers = [m for m in members if m[0] != keep[0]] decision["merged_into"] = {"id": keep[0], "name": keep[1]} if not dry_run: for loser in losers: # Record the merge durably so deterministic re-runs respect it. conn.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at) VALUES (?,?,?,?,?) ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id, confidence=excluded.confidence, reason=excluded.reason""", (loser[0], keep[0], verdict.get("confidence", 0.7), verdict.get("reason"), _now())) conn.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge', confidence=? " "WHERE canonical_id=?", (keep[0], verdict.get("confidence", 0.7), loser[0])) conn.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), loser[0])) conn.execute("""INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", (str(uuid.uuid4()), _now(), "agent", "qwen_entity_resolver", "entity.merged", "canonical_entity", keep[0], json.dumps(decision), "ingest", _now())) merges.append(decision) if not dry_run: conn.commit() live_people = conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND deleted_at IS NULL").fetchone()[0] conn.close() return merges, live_people def main(): ap = argparse.ArgumentParser() ap.add_argument("--db", default="data/crm_dev.db") ap.add_argument("--threshold", type=float, default=0.7) ap.add_argument("--dry-run", action="store_true") args = ap.parse_args() merges, live = run(args.db, threshold=args.threshold, dry_run=args.dry_run) print(f"Adjudicated {len(merges)} candidate group(s){' (dry run)' if args.dry_run else ''}:") for m in merges: names = " / ".join(p["name"] for p in m["members"]) verdict = f"MERGE -> {m['merged_into']['name']}" if m.get("merged_into") else "keep separate" print(f" [{m['surname']}] {names}: same={m['same']} conf={m['confidence']} => {verdict}") if m.get("reason"): print(f" reason: {m['reason']}") print(f"Live person entities now: {live}") if __name__ == "__main__": main()