#!/usr/bin/env python3 """Phase-0 Workstream B3 / A4 — entity resolution (deterministic tier). Collapses the CRM's two parallel investor models into the canonical identity layer created by migration 0001: organizations ─┐ fundraising_investors ─┴─► canonical_entities (entity_kind = lp | organization) contacts ─┐ fundraising_contacts ─┴─► canonical_entities (entity_kind = person) lp_profiles ───► linked to its contact's person entity Every source row is recorded in `entity_links` so any name variant resolves to one canonical id. This is the DETERMINISTIC tier — it merges only what we can prove (exact email; exact normalized name within the same canonical org). The HARD cases (nicknames like "Jon" vs "Jonathan", typos) are NOT guessed; they are emitted as *fuzzy candidates* for the local-Qwen tier (Spark Control /v1/chat/completions) to adjudicate later. Honest separation: we never silently merge on a guess. Properties: * Local-only, read-mostly: reads CRM source tables, writes only the derived canonical_entities / entity_links and an interaction_log audit row. Never mutates a CRM source record (guardrail #2/#3). * Idempotent: canonical ids are deterministic (sha1 of the resolution key), so re-running upserts in place and keeps ids stable across runs — which keeps downstream Qdrant point ids valid (no churn on re-embed). * Logged: writes one interaction_log row per run (guardrail #5). Usage: python3 backend/ingest/entity_resolution.py --db data/crm_dev.db python3 backend/ingest/entity_resolution.py --db data/crm_dev.db --show-candidates """ import argparse import hashlib import json import re import sqlite3 import uuid from collections import defaultdict from datetime import datetime, timezone # ── normalization ───────────────────────────────────────────────────────────── def norm_text(s: str) -> str: s = (s or "").strip().lower() s = re.sub(r"[^\w\s]", " ", s) return re.sub(r"\s+", " ", s).strip() def norm_email(s: str) -> str: return (s or "").strip().lower() def _eid(prefix: str, key: str) -> str: """Deterministic canonical id: stable across runs for the same resolution key.""" return f"{prefix}_{hashlib.sha1(key.encode('utf-8')).hexdigest()[:12]}" def _now() -> str: return datetime.now(timezone.utc).isoformat() def _split_name(full: str): parts = norm_text(full).split() if not parts: return "", "" return parts[0], parts[-1] if len(parts) > 1 else "" def _redirect(merge_map, eid): """Follow durable fuzzy-merge redirects (entity_merges) so deterministic re-runs respect prior merges instead of recreating the merged-away entity.""" seen = set() while eid in merge_map and eid not in seen: seen.add(eid) eid = merge_map[eid] return eid # ── upsert helpers ──────────────────────────────────────────────────────────── def _upsert_entity(conn, eid, kind, display_name, primary_email): conn.execute( """ INSERT INTO canonical_entities (id, entity_kind, display_name, primary_email, source, created_at, updated_at) VALUES (?, ?, ?, ?, 'entity_resolution', ?, ?) ON CONFLICT(id) DO UPDATE SET display_name = excluded.display_name, primary_email = COALESCE(excluded.primary_email, canonical_entities.primary_email), entity_kind = excluded.entity_kind, -- Re-emitting a deterministic id means the entity is live again, so -- clear any prior tombstone (e.g. a stale-cleanup soft-delete from a -- run when the source row was briefly absent). Fuzzy-merge losers are -- redirected away by _redirect and never re-upserted, so this never -- resurrects a merged-away entity. deleted_at = NULL, updated_at = excluded.updated_at """, (eid, kind, display_name, primary_email or None, _now(), _now()), ) def _link(conn, canonical_id, source_model, source_id, match_value, match_kind, confidence): conn.execute( """ INSERT INTO entity_links (id, canonical_id, source_model, source_id, match_value, match_kind, confidence, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(source_model, source_id, match_value) DO UPDATE SET canonical_id = excluded.canonical_id, match_kind = excluded.match_kind, confidence = excluded.confidence """, (str(uuid.uuid4()), canonical_id, source_model, source_id, match_value, match_kind, confidence, _now()), ) # ── resolution passes ───────────────────────────────────────────────────────── def resolve_organizations(conn, merge_map=None): """Merge organizations + fundraising_investors by normalized name. Returns (org_canon_by_orgid, org_canon_by_fundinv) so the people pass can attach each person to their firm's canonical id. """ merge_map = merge_map or {} groups = defaultdict(lambda: {"orgs": [], "investors": [], "name": "", "email": ""}) for r in conn.execute("SELECT id, name, email FROM organizations"): key = norm_text(r["name"]) if not key: continue g = groups[key] g["orgs"].append(r["id"]) if len(r["name"] or "") > len(g["name"]): g["name"] = r["name"] if not g["email"] and (r["email"] or "").strip(): g["email"] = r["email"].strip() for r in conn.execute("SELECT id, investor_name FROM fundraising_investors"): key = norm_text(r["investor_name"]) if not key: continue g = groups[key] g["investors"].append(r["id"]) if not g["name"]: g["name"] = r["investor_name"] org_canon_by_orgid, org_canon_by_fundinv = {}, {} for key, g in groups.items(): # Every firm group is one INVESTOR entity. The fundraising grid is the # source of truth for investor entities (each row = one investor, whether # an institution/family-office or an individual); the organizations table # mirrors those names. So we no longer split into lp/organization. cid = _redirect(merge_map, _eid("inv", key)) _upsert_entity(conn, cid, "investor", g["name"], g["email"]) for oid in g["orgs"]: _link(conn, cid, "organizations", oid, key, "exact_name", 1.0) org_canon_by_orgid[oid] = cid for iid in g["investors"]: _link(conn, cid, "fundraising_investors", iid, key, "exact_name", 1.0) org_canon_by_fundinv[iid] = cid return org_canon_by_orgid, org_canon_by_fundinv def _member_of(conn, person_id, investor_id): """Record that a person (contact) belongs to an investor entity.""" if not investor_id or person_id == investor_id: return conn.execute(""" INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed, first_seen_at, last_seen_at, created_at, updated_at) VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?) ON CONFLICT(src_id, dst_id, edge_type, source) DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at """, (str(uuid.uuid4()), person_id, investor_id, _now(), _now(), _now(), _now())) def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv, merge_map=None): """People come from the CONTACTS table (one person per contact, where the emails/LinkedIn live). The fundraising grid's contacts are NOT a second set of people — each is matched to a contact-person and recorded only as a member_of edge to its investor entity (the grid's 'Contacts' column says who belongs to which investor). This is what stops the double-count. Returns contact_id -> person canonical id (for lp_profiles).""" merge_map = merge_map or {} contact_to_person = {} person_meta = {} by_email = {} # norm_email -> person cid by_name_inv = {} # (name_norm, investor_canon) -> person cid def _person(full, email, inv_canon, model, sid): name_norm = norm_text(full) if email: key, mk, conf, mv = f"e|{email}", "exact_email", 1.0, email elif name_norm: key, mk, conf, mv = f"n|{name_norm}|{inv_canon or ''}", "name_org", 0.8, name_norm else: return None cid = _redirect(merge_map, _eid("per", key)) _upsert_entity(conn, cid, "person", full.strip() or email, email) _link(conn, cid, model, sid, mv, mk, conf) if email: by_email[email] = cid if name_norm: by_name_inv[(name_norm, inv_canon or "")] = cid _member_of(conn, cid, inv_canon) m = person_meta.setdefault(cid, {"org": inv_canon, "last": _split_name(full)[1], "name": full.strip() or email, "email": email}) if inv_canon and not m["org"]: m["org"] = inv_canon return cid # 1. People = the contacts table. for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts WHERE deleted_at IS NULL"): full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip() cid = _person(full, norm_email(r["email"]), org_canon_by_orgid.get(r["organization_id"]), "contacts", r["id"]) if cid: contact_to_person[r["id"]] = cid # 2. Grid contacts are associations, not new people: link each to its # contacts-table person and record membership. We prefer the EXPLICIT # contact_id link (migration 0004 — the grid pill stores the id of the # contact it was created from), and fall back to provable email / exact name # within the same investor for rows not yet backfilled. On a miss we # deliberately do NOT mint a person: the old else-branch mint is exactly what # produced the people double-count, and guessing by name across firms risks # binding two different same-named people — honest separation, never on a guess. fc_cols = {row[1] for row in conn.execute("PRAGMA table_info(fundraising_contacts)")} has_contact_id = "contact_id" in fc_cols sel = ("SELECT id, full_name, email, investor_id" + (", contact_id" if has_contact_id else "") + " FROM fundraising_contacts") for r in conn.execute(sel): email = norm_email(r["email"]) name_norm = norm_text(r["full_name"] or "") inv_canon = org_canon_by_fundinv.get(r["investor_id"]) link_cid = r["contact_id"] if has_contact_id else None cid = (contact_to_person.get(link_cid) if link_cid else None) \ or (by_email.get(email) if email else None) \ or by_name_inv.get((name_norm, inv_canon or "")) if cid: mk = "grid_link" if (link_cid and contact_to_person.get(link_cid)) else "grid_assoc" _link(conn, cid, "fundraising_contacts", r["id"], email or name_norm, mk, 0.95 if mk == "grid_link" else 0.9) _member_of(conn, cid, inv_canon) # lp_profiles -> the person entity of its contact for r in conn.execute("SELECT id, contact_id FROM lp_profiles WHERE deleted_at IS NULL"): cid = contact_to_person.get(r["contact_id"]) if cid: _link(conn, cid, "lp_profiles", r["id"], r["contact_id"], "contact_fk", 1.0) return person_meta def find_fuzzy_candidates(person_meta): """Distinct person entities sharing the same canonical org AND surname are likely the same individual under a name variant (e.g. Jon/Jonathan). Emit them for the local-Qwen tier; do NOT merge here.""" by_org_last = defaultdict(list) for cid, m in person_meta.items(): if m["org"] and m["last"]: by_org_last[(m["org"], m["last"])].append((cid, m["name"], m["email"])) return [{"org": org, "surname": last, "members": members} for (org, last), members in by_org_last.items() if len(members) > 1] def run(db_path: str): conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys=ON") # Durable fuzzy-merge map (entity_merges) so deterministic re-runs respect # prior local-Qwen merges instead of recreating merged-away entities. conn.execute("""CREATE TABLE IF NOT EXISTS entity_merges ( merged_id TEXT PRIMARY KEY, survivor_id TEXT NOT NULL, confidence REAL, reason TEXT, created_at TEXT DEFAULT (datetime('now')) )""") merge_map = {r["merged_id"]: r["survivor_id"] for r in conn.execute("SELECT merged_id, survivor_id FROM entity_merges")} org_by_oid, org_by_inv = resolve_organizations(conn, merge_map) conn.commit() person_meta = resolve_people(conn, org_by_oid, org_by_inv, merge_map) conn.commit() live = "deleted_at IS NULL" # ── Clean up stale derived rows (soft-delete only; guardrail #3) ── # Two UNAMBIGUOUS classes of obsolete entity_resolution-owned rows, tombstoned # ONLY when they carry no human/enrichment data so nothing a partner entered is # ever dropped: # (a) PERSON rows with no 'contacts' source link. Real people come from the # contacts table (pass 1); a person linked only from the grid is a leftover # "twin" minted by the pre-fix else-branch — the source of the 1406 # double-count. (Narrow + safe: a contact whose canonical id merely # *changed* still keeps a 'contacts' link, so it is never caught here.) # (b) Rows under the superseded pre-:48 kinds 'lp'/'organization' (the model # is now investor | person), left live by old upsert-only runs. # We list the ids first and log them (guardrail #5: the soft-delete is # reviewable/undoable), then tombstone + audit in ONE transaction. nodata = ("thesis_fit IS NULL AND segment IS NULL AND accreditation_status IS NULL " "AND qp_status IS NULL AND warmth_score IS NULL AND owner_id IS NULL " "AND last_touch_at IS NULL AND notes IS NULL") stale = [r["id"] for r in conn.execute(f""" SELECT id FROM canonical_entities c WHERE {live} AND source='entity_resolution' AND {nodata} AND ( (entity_kind='person' AND NOT EXISTS ( SELECT 1 FROM entity_links l WHERE l.canonical_id = c.id AND l.source_model = 'contacts')) OR entity_kind IN ('lp', 'organization') ) """)] for sid in stale: conn.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), sid)) if stale: conn.execute( """INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at) VALUES (?, ?, 'system', 'entity_resolver', 'entity.stale_tombstoned', 'canonical_entities', NULL, ?, 'ingest', ?)""", (str(uuid.uuid4()), _now(), json.dumps({"count": len(stale), "ids": stale}), _now()), ) pruned = len(stale) conn.commit() candidates = find_fuzzy_candidates(person_meta) # Counts report LIVE entities (deleted_at IS NULL); fuzzy-merged losers and # tombstoned stale rows are soft-deleted (guardrail #3) and excluded. counts = { "canonical_total": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE {live}").fetchone()[0], "investor": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='investor' AND {live}").fetchone()[0], "person": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0], "links": conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0], "fuzzy_candidates": len(candidates), "pruned_stale": pruned, } conn.execute( """ INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, payload, source, created_at) VALUES (?, ?, 'system', 'entity_resolver', 'entity_resolution.run', 'canonical_entities', ?, 'ingest', ?) """, (str(uuid.uuid4()), _now(), json.dumps(counts), _now()), ) conn.commit() conn.close() return counts, candidates def main(): ap = argparse.ArgumentParser(description="Deterministic entity resolution into the canonical layer.") ap.add_argument("--db", default="data/crm_dev.db", help="path to the CRM SQLite DB") ap.add_argument("--show-candidates", action="store_true", help="print fuzzy merge candidates") args = ap.parse_args() counts, candidates = run(args.db) print(f"Entity resolution on {args.db}:") for k, v in counts.items(): print(f" {k:<18} {v}") if args.show_candidates and candidates: print("\nFuzzy candidates (same org + surname, different person — for the local-Qwen tier):") for c in candidates: names = ", ".join(f"{n!r}{(' <'+e+'>') if e else ''}" for _, n, e in c["members"]) print(f" [{c['surname']}] {names}") if __name__ == "__main__": main()