d16264f401
Root cause: grid contacts (fundraising_contacts) are the SAME people as the contacts table (the app syncs them by name/email), but resolution matched grid rows by (name + investor-canon) where the two sides derive the investor key from different tables that rarely line up — so nearly every grid contact minted a duplicate person (715 + ~692 ≈ 1406), and the duplicate finder then flagged each twin against its real self (~676 candidates). Fix (entity_resolution.py): - Grid pass matches a grid contact to its existing contacts-table person by PROVABLE keys only (exact email, else exact name within the same investor) and records membership; on a miss it MINTS NOTHING (the old else-branch mint was the double-count source, and guessing by name across firms risks binding two different same-named people). - Targeted, audited cleanup soft-deletes leftover grid-only "twins" (person rows with no 'contacts' link) and superseded pre-:48 'lp'/'organization' rows, guarded so any row carrying enrichment/human data is never dropped (guardrail #3); the tombstoned ids are logged to interaction_log (guardrail #5). - _upsert_entity clears deleted_at on conflict so a re-emitted id is un-tombstoned (no permanent burial); fuzzy-merge losers stay buried via _redirect. entity_merge.py / server.py: the duplicate queue + pending count now filter to candidates whose both sides are still live, so self-healed twins drop out. Verified: offline reproduction test (backend/ingest/test_entity_resolution.py, 10/10) reproduces the 1406-style doubling and proves it collapses; no regression on the synthetic dev set; two adversarial review passes. Known pre-existing identity-key weaknesses (same name+firm+no email collision; shared role inbox over-link) are unchanged by this fix and will be resolved structurally by the contact_id link in the grid/contacts unification. Run "Build search index" after upgrading to recompute the canonical layer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
371 lines
17 KiB
Python
371 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""Phase-0 Workstream B3 / A4 — entity resolution (deterministic tier).
|
|
|
|
Collapses the CRM's two parallel investor models into the canonical identity
|
|
layer created by migration 0001:
|
|
|
|
organizations ─┐
|
|
fundraising_investors ─┴─► canonical_entities (entity_kind = lp | organization)
|
|
contacts ─┐
|
|
fundraising_contacts ─┴─► canonical_entities (entity_kind = person)
|
|
lp_profiles ───► linked to its contact's person entity
|
|
|
|
Every source row is recorded in `entity_links` so any name variant resolves to
|
|
one canonical id. This is the DETERMINISTIC tier — it merges only what we can
|
|
prove (exact email; exact normalized name within the same canonical org). The
|
|
HARD cases (nicknames like "Jon" vs "Jonathan", typos) are NOT guessed; they are
|
|
emitted as *fuzzy candidates* for the local-Qwen tier (Spark Control
|
|
/v1/chat/completions) to adjudicate later. Honest separation: we never silently
|
|
merge on a guess.
|
|
|
|
Properties:
|
|
* Local-only, read-mostly: reads CRM source tables, writes only the derived
|
|
canonical_entities / entity_links and an interaction_log audit row. Never
|
|
mutates a CRM source record (guardrail #2/#3).
|
|
* Idempotent: canonical ids are deterministic (sha1 of the resolution key), so
|
|
re-running upserts in place and keeps ids stable across runs — which keeps
|
|
downstream Qdrant point ids valid (no churn on re-embed).
|
|
* Logged: writes one interaction_log row per run (guardrail #5).
|
|
|
|
Usage:
|
|
python3 backend/ingest/entity_resolution.py --db data/crm_dev.db
|
|
python3 backend/ingest/entity_resolution.py --db data/crm_dev.db --show-candidates
|
|
"""
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
import uuid
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
# ── normalization ─────────────────────────────────────────────────────────────
|
|
|
|
def norm_text(s: str) -> str:
|
|
s = (s or "").strip().lower()
|
|
s = re.sub(r"[^\w\s]", " ", s)
|
|
return re.sub(r"\s+", " ", s).strip()
|
|
|
|
|
|
def norm_email(s: str) -> str:
|
|
return (s or "").strip().lower()
|
|
|
|
|
|
def _eid(prefix: str, key: str) -> str:
|
|
"""Deterministic canonical id: stable across runs for the same resolution key."""
|
|
return f"{prefix}_{hashlib.sha1(key.encode('utf-8')).hexdigest()[:12]}"
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _split_name(full: str):
|
|
parts = norm_text(full).split()
|
|
if not parts:
|
|
return "", ""
|
|
return parts[0], parts[-1] if len(parts) > 1 else ""
|
|
|
|
|
|
def _redirect(merge_map, eid):
|
|
"""Follow durable fuzzy-merge redirects (entity_merges) so deterministic
|
|
re-runs respect prior merges instead of recreating the merged-away entity."""
|
|
seen = set()
|
|
while eid in merge_map and eid not in seen:
|
|
seen.add(eid)
|
|
eid = merge_map[eid]
|
|
return eid
|
|
|
|
|
|
# ── upsert helpers ────────────────────────────────────────────────────────────
|
|
|
|
def _upsert_entity(conn, eid, kind, display_name, primary_email):
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO canonical_entities (id, entity_kind, display_name, primary_email, source, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, 'entity_resolution', ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
display_name = excluded.display_name,
|
|
primary_email = COALESCE(excluded.primary_email, canonical_entities.primary_email),
|
|
entity_kind = excluded.entity_kind,
|
|
-- Re-emitting a deterministic id means the entity is live again, so
|
|
-- clear any prior tombstone (e.g. a stale-cleanup soft-delete from a
|
|
-- run when the source row was briefly absent). Fuzzy-merge losers are
|
|
-- redirected away by _redirect and never re-upserted, so this never
|
|
-- resurrects a merged-away entity.
|
|
deleted_at = NULL,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(eid, kind, display_name, primary_email or None, _now(), _now()),
|
|
)
|
|
|
|
|
|
def _link(conn, canonical_id, source_model, source_id, match_value, match_kind, confidence):
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO entity_links (id, canonical_id, source_model, source_id, match_value, match_kind, confidence, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_model, source_id, match_value) DO UPDATE SET
|
|
canonical_id = excluded.canonical_id,
|
|
match_kind = excluded.match_kind,
|
|
confidence = excluded.confidence
|
|
""",
|
|
(str(uuid.uuid4()), canonical_id, source_model, source_id, match_value, match_kind, confidence, _now()),
|
|
)
|
|
|
|
|
|
# ── resolution passes ─────────────────────────────────────────────────────────
|
|
|
|
def resolve_organizations(conn, merge_map=None):
|
|
"""Merge organizations + fundraising_investors by normalized name.
|
|
|
|
Returns (org_canon_by_orgid, org_canon_by_fundinv) so the people pass can
|
|
attach each person to their firm's canonical id.
|
|
"""
|
|
merge_map = merge_map or {}
|
|
groups = defaultdict(lambda: {"orgs": [], "investors": [], "name": "", "email": ""})
|
|
|
|
for r in conn.execute("SELECT id, name, email FROM organizations"):
|
|
key = norm_text(r["name"])
|
|
if not key:
|
|
continue
|
|
g = groups[key]
|
|
g["orgs"].append(r["id"])
|
|
if len(r["name"] or "") > len(g["name"]):
|
|
g["name"] = r["name"]
|
|
if not g["email"] and (r["email"] or "").strip():
|
|
g["email"] = r["email"].strip()
|
|
|
|
for r in conn.execute("SELECT id, investor_name FROM fundraising_investors"):
|
|
key = norm_text(r["investor_name"])
|
|
if not key:
|
|
continue
|
|
g = groups[key]
|
|
g["investors"].append(r["id"])
|
|
if not g["name"]:
|
|
g["name"] = r["investor_name"]
|
|
|
|
org_canon_by_orgid, org_canon_by_fundinv = {}, {}
|
|
for key, g in groups.items():
|
|
# Every firm group is one INVESTOR entity. The fundraising grid is the
|
|
# source of truth for investor entities (each row = one investor, whether
|
|
# an institution/family-office or an individual); the organizations table
|
|
# mirrors those names. So we no longer split into lp/organization.
|
|
cid = _redirect(merge_map, _eid("inv", key))
|
|
_upsert_entity(conn, cid, "investor", g["name"], g["email"])
|
|
for oid in g["orgs"]:
|
|
_link(conn, cid, "organizations", oid, key, "exact_name", 1.0)
|
|
org_canon_by_orgid[oid] = cid
|
|
for iid in g["investors"]:
|
|
_link(conn, cid, "fundraising_investors", iid, key, "exact_name", 1.0)
|
|
org_canon_by_fundinv[iid] = cid
|
|
|
|
return org_canon_by_orgid, org_canon_by_fundinv
|
|
|
|
|
|
def _member_of(conn, person_id, investor_id):
|
|
"""Record that a person (contact) belongs to an investor entity."""
|
|
if not investor_id or person_id == investor_id:
|
|
return
|
|
conn.execute("""
|
|
INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed,
|
|
first_seen_at, last_seen_at, created_at, updated_at)
|
|
VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?)
|
|
ON CONFLICT(src_id, dst_id, edge_type, source)
|
|
DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at
|
|
""", (str(uuid.uuid4()), person_id, investor_id, _now(), _now(), _now(), _now()))
|
|
|
|
|
|
def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv, merge_map=None):
|
|
"""People come from the CONTACTS table (one person per contact, where the
|
|
emails/LinkedIn live). The fundraising grid's contacts are NOT a second set of
|
|
people — each is matched to a contact-person and recorded only as a member_of
|
|
edge to its investor entity (the grid's 'Contacts' column says who belongs to
|
|
which investor). This is what stops the double-count.
|
|
Returns contact_id -> person canonical id (for lp_profiles)."""
|
|
merge_map = merge_map or {}
|
|
contact_to_person = {}
|
|
person_meta = {}
|
|
by_email = {} # norm_email -> person cid
|
|
by_name_inv = {} # (name_norm, investor_canon) -> person cid
|
|
|
|
def _person(full, email, inv_canon, model, sid):
|
|
name_norm = norm_text(full)
|
|
if email:
|
|
key, mk, conf, mv = f"e|{email}", "exact_email", 1.0, email
|
|
elif name_norm:
|
|
key, mk, conf, mv = f"n|{name_norm}|{inv_canon or ''}", "name_org", 0.8, name_norm
|
|
else:
|
|
return None
|
|
cid = _redirect(merge_map, _eid("per", key))
|
|
_upsert_entity(conn, cid, "person", full.strip() or email, email)
|
|
_link(conn, cid, model, sid, mv, mk, conf)
|
|
if email:
|
|
by_email[email] = cid
|
|
if name_norm:
|
|
by_name_inv[(name_norm, inv_canon or "")] = cid
|
|
_member_of(conn, cid, inv_canon)
|
|
m = person_meta.setdefault(cid, {"org": inv_canon, "last": _split_name(full)[1],
|
|
"name": full.strip() or email, "email": email})
|
|
if inv_canon and not m["org"]:
|
|
m["org"] = inv_canon
|
|
return cid
|
|
|
|
# 1. People = the contacts table.
|
|
for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts WHERE deleted_at IS NULL"):
|
|
full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip()
|
|
cid = _person(full, norm_email(r["email"]), org_canon_by_orgid.get(r["organization_id"]), "contacts", r["id"])
|
|
if cid:
|
|
contact_to_person[r["id"]] = cid
|
|
|
|
# 2. Grid contacts are associations, not new people: match to a contact-person
|
|
# (by email, else name within the same investor) and just add membership.
|
|
# Only create a person when there is genuinely no matching contact.
|
|
for r in conn.execute("SELECT id, full_name, email, investor_id FROM fundraising_contacts"):
|
|
email = norm_email(r["email"])
|
|
name_norm = norm_text(r["full_name"] or "")
|
|
inv_canon = org_canon_by_fundinv.get(r["investor_id"])
|
|
# Match the grid contact to its contacts-table person by PROVABLE keys only:
|
|
# exact email, else exact name within the SAME canonical investor. The app
|
|
# keeps the grid and the contacts table in sync (_upsert_contact_from_
|
|
# fundraising), so a grid contact IS an existing contact-person, never a new
|
|
# one. On a confident match, record the membership. On a miss we deliberately
|
|
# do NOT mint a person: the old else-branch mint is exactly what produced the
|
|
# people double-count (a grid row whose (name, investor) key didn't line up
|
|
# with its contact minted a duplicate), and guessing by name across firms
|
|
# risks binding two different same-named people. Unresolved grid rows are
|
|
# left for the explicit contact_id link planned in the grid/contacts
|
|
# unification — honest separation: never merge or mint on a guess.
|
|
cid = (by_email.get(email) if email else None) or by_name_inv.get((name_norm, inv_canon or ""))
|
|
if cid:
|
|
_link(conn, cid, "fundraising_contacts", r["id"], email or name_norm, "grid_assoc", 0.9)
|
|
_member_of(conn, cid, inv_canon)
|
|
|
|
# lp_profiles -> the person entity of its contact
|
|
for r in conn.execute("SELECT id, contact_id FROM lp_profiles WHERE deleted_at IS NULL"):
|
|
cid = contact_to_person.get(r["contact_id"])
|
|
if cid:
|
|
_link(conn, cid, "lp_profiles", r["id"], r["contact_id"], "contact_fk", 1.0)
|
|
|
|
return person_meta
|
|
|
|
|
|
def find_fuzzy_candidates(person_meta):
|
|
"""Distinct person entities sharing the same canonical org AND surname are
|
|
likely the same individual under a name variant (e.g. Jon/Jonathan). Emit them
|
|
for the local-Qwen tier; do NOT merge here."""
|
|
by_org_last = defaultdict(list)
|
|
for cid, m in person_meta.items():
|
|
if m["org"] and m["last"]:
|
|
by_org_last[(m["org"], m["last"])].append((cid, m["name"], m["email"]))
|
|
return [{"org": org, "surname": last, "members": members}
|
|
for (org, last), members in by_org_last.items() if len(members) > 1]
|
|
|
|
|
|
def run(db_path: str):
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
|
|
# Durable fuzzy-merge map (entity_merges) so deterministic re-runs respect
|
|
# prior local-Qwen merges instead of recreating merged-away entities.
|
|
conn.execute("""CREATE TABLE IF NOT EXISTS entity_merges (
|
|
merged_id TEXT PRIMARY KEY,
|
|
survivor_id TEXT NOT NULL,
|
|
confidence REAL,
|
|
reason TEXT,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
)""")
|
|
merge_map = {r["merged_id"]: r["survivor_id"]
|
|
for r in conn.execute("SELECT merged_id, survivor_id FROM entity_merges")}
|
|
|
|
org_by_oid, org_by_inv = resolve_organizations(conn, merge_map)
|
|
conn.commit()
|
|
person_meta = resolve_people(conn, org_by_oid, org_by_inv, merge_map)
|
|
conn.commit()
|
|
|
|
live = "deleted_at IS NULL"
|
|
|
|
# ── Clean up stale derived rows (soft-delete only; guardrail #3) ──
|
|
# Two UNAMBIGUOUS classes of obsolete entity_resolution-owned rows, tombstoned
|
|
# ONLY when they carry no human/enrichment data so nothing a partner entered is
|
|
# ever dropped:
|
|
# (a) PERSON rows with no 'contacts' source link. Real people come from the
|
|
# contacts table (pass 1); a person linked only from the grid is a leftover
|
|
# "twin" minted by the pre-fix else-branch — the source of the 1406
|
|
# double-count. (Narrow + safe: a contact whose canonical id merely
|
|
# *changed* still keeps a 'contacts' link, so it is never caught here.)
|
|
# (b) Rows under the superseded pre-:48 kinds 'lp'/'organization' (the model
|
|
# is now investor | person), left live by old upsert-only runs.
|
|
# We list the ids first and log them (guardrail #5: the soft-delete is
|
|
# reviewable/undoable), then tombstone + audit in ONE transaction.
|
|
nodata = ("thesis_fit IS NULL AND segment IS NULL AND accreditation_status IS NULL "
|
|
"AND qp_status IS NULL AND warmth_score IS NULL AND owner_id IS NULL "
|
|
"AND last_touch_at IS NULL AND notes IS NULL")
|
|
stale = [r["id"] for r in conn.execute(f"""
|
|
SELECT id FROM canonical_entities c
|
|
WHERE {live} AND source='entity_resolution' AND {nodata}
|
|
AND ( (entity_kind='person' AND NOT EXISTS (
|
|
SELECT 1 FROM entity_links l
|
|
WHERE l.canonical_id = c.id AND l.source_model = 'contacts'))
|
|
OR entity_kind IN ('lp', 'organization') )
|
|
""")]
|
|
for sid in stale:
|
|
conn.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), sid))
|
|
if stale:
|
|
conn.execute(
|
|
"""INSERT INTO interaction_log
|
|
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
|
|
VALUES (?, ?, 'system', 'entity_resolver', 'entity.stale_tombstoned', 'canonical_entities', NULL, ?, 'ingest', ?)""",
|
|
(str(uuid.uuid4()), _now(), json.dumps({"count": len(stale), "ids": stale}), _now()),
|
|
)
|
|
pruned = len(stale)
|
|
conn.commit()
|
|
|
|
candidates = find_fuzzy_candidates(person_meta)
|
|
|
|
# Counts report LIVE entities (deleted_at IS NULL); fuzzy-merged losers and
|
|
# tombstoned stale rows are soft-deleted (guardrail #3) and excluded.
|
|
counts = {
|
|
"canonical_total": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE {live}").fetchone()[0],
|
|
"investor": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='investor' AND {live}").fetchone()[0],
|
|
"person": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0],
|
|
"links": conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0],
|
|
"fuzzy_candidates": len(candidates),
|
|
"pruned_stale": pruned,
|
|
}
|
|
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, payload, source, created_at)
|
|
VALUES (?, ?, 'system', 'entity_resolver', 'entity_resolution.run', 'canonical_entities', ?, 'ingest', ?)
|
|
""",
|
|
(str(uuid.uuid4()), _now(), json.dumps(counts), _now()),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return counts, candidates
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Deterministic entity resolution into the canonical layer.")
|
|
ap.add_argument("--db", default="data/crm_dev.db", help="path to the CRM SQLite DB")
|
|
ap.add_argument("--show-candidates", action="store_true", help="print fuzzy merge candidates")
|
|
args = ap.parse_args()
|
|
|
|
counts, candidates = run(args.db)
|
|
print(f"Entity resolution on {args.db}:")
|
|
for k, v in counts.items():
|
|
print(f" {k:<18} {v}")
|
|
if args.show_candidates and candidates:
|
|
print("\nFuzzy candidates (same org + surname, different person — for the local-Qwen tier):")
|
|
for c in candidates:
|
|
names = ", ".join(f"{n!r}{(' <'+e+'>') if e else ''}" for _, n, e in c["members"])
|
|
print(f" [{c['surname']}] {names}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|