#!/usr/bin/env python3 """Offline regression test for the grid-contact double-count fix (safe version). Reproduces the real-data condition that made PEOPLE (RESOLVED) ≈ contacts + grid-contacts (the ~1406 bug): grid contacts (fundraising_contacts) with no email and contacts with no organization, so the (name, investor) keys never coincide and every grid row used to mint its own duplicate person. Asserts the SAFE fix: 1. a grid contact with a matching email links back to its contact (no new person), 2. a grid contact whose contact shares the same investor links back by name, 3. a grid contact that can't be PROVABLY matched mints NOTHING (no duplicate person, no cross-firm name guess) — the count stays correct, 4. targeted cleanup soft-deletes a stale grid-only "twin" (person with no contacts link) and a superseded 'lp'/'organization' row, with no enrichment, 5. cleanup PRESERVES a grid-only person that carries enrichment (guardrail #3), 6. a re-emitted id is UN-tombstoned (no permanent burial), 7. re-running is idempotent. Pure stdlib + SQLite; never calls Spark/Qwen (er.run is deterministic-only). Run: cd backend/ingest && python3 test_entity_resolution.py """ import os import sqlite3 import sys import tempfile import entity_resolution as er SCHEMA = """ CREATE TABLE canonical_entities ( id TEXT PRIMARY KEY, entity_kind TEXT NOT NULL, display_name TEXT NOT NULL, primary_email TEXT, thesis_fit TEXT, segment TEXT, accreditation_status TEXT, qp_status TEXT, warmth_score REAL, source TEXT, owner_id TEXT, last_touch_at TEXT, notes TEXT, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), deleted_at TEXT ); CREATE TABLE entity_links ( id TEXT PRIMARY KEY, canonical_id TEXT, source_model TEXT, source_id TEXT, match_value TEXT, match_kind TEXT, confidence REAL, created_at TEXT, UNIQUE(source_model, source_id, match_value) ); CREATE TABLE relationship_edges ( id TEXT PRIMARY KEY, src_id TEXT, dst_id TEXT, edge_type TEXT, source TEXT, strength REAL, directed INTEGER, first_seen_at TEXT, last_seen_at TEXT, created_at TEXT, updated_at TEXT, UNIQUE(src_id, dst_id, edge_type, source) ); CREATE TABLE interaction_log ( id TEXT PRIMARY KEY, ts TEXT, actor_type TEXT, actor_id TEXT, action TEXT, target_type TEXT, target_id TEXT, payload TEXT, source TEXT, created_at TEXT ); CREATE TABLE contacts ( id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT, email TEXT, organization_id TEXT, deleted_at TEXT ); CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, email TEXT); CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT); CREATE TABLE fundraising_contacts (id TEXT PRIMARY KEY, full_name TEXT, email TEXT, investor_id TEXT, contact_id TEXT); CREATE TABLE lp_profiles (id TEXT PRIMARY KEY, contact_id TEXT, deleted_at TEXT); """ SEEDED = ("per_TWIN", "per_ENR", "lp_OLD") def seed(db): c = sqlite3.connect(db) c.executescript(SCHEMA) c.execute("INSERT INTO organizations (id, name, email) VALUES ('o1','Acme Capital',NULL)") c.executemany("INSERT INTO contacts (id, first_name, last_name, email, organization_id) VALUES (?,?,?,?,?)", [ ("c1", "Alice", "Anderson", "alice@x.com", None), # email, no org ("c2", "Bob", "Brown", None, None), # no email, no org ("c3", "Dave", "Davis", None, "o1"), # no email, org = Acme ("c4", "Frank", "Foster", "frank@x.com", None), # target of an explicit id link ]) c.executemany("INSERT INTO fundraising_investors (id, investor_name) VALUES (?,?)", [ ("i_acme", "Acme Capital"), ("i_beta", "Beta Family Office"), ]) c.executemany("INSERT INTO fundraising_contacts (id, full_name, email, investor_id, contact_id) VALUES (?,?,?,?,?)", [ ("g_alice", "Alice Anderson", "alice@x.com", "i_beta", None), # -> email match to c1 ("g_dave", "Dave Davis", None, "i_acme", None), # -> name+investor match to c3 ("g_bob", "Bob Brown", None, "i_beta", None), # -> MISS (c2 has no org) -> mint NOTHING ("g_carol", "Carol Clark", None, "i_beta", None), # -> MISS (no contact) -> mint NOTHING ("g_link", "Totally Mismatched", None, "i_beta", "c4"), # -> explicit contact_id link wins over name/inv ]) # Stale grid-only "twin" (person, only a fundraising_contacts link, no enrichment) -> prune c.execute("INSERT INTO canonical_entities (id, entity_kind, display_name, source) VALUES " "('per_TWIN','person','Ghost Twin','entity_resolution')") c.execute("INSERT INTO entity_links (id, canonical_id, source_model, source_id, match_value, match_kind, confidence, created_at) " "VALUES ('l_twin','per_TWIN','fundraising_contacts','gx','ghost','name_org',0.8,'t')") # Grid-only person WITH enrichment -> preserved (guardrail #3) c.execute("INSERT INTO canonical_entities (id, entity_kind, display_name, source, segment) VALUES " "('per_ENR','person','Enriched Orphan','entity_resolution','warm')") c.execute("INSERT INTO entity_links (id, canonical_id, source_model, source_id, match_value, match_kind, confidence, created_at) " "VALUES ('l_enr','per_ENR','fundraising_contacts','gy','enr','name_org',0.8,'t')") # Superseded pre-:48 kind -> prune c.execute("INSERT INTO canonical_entities (id, entity_kind, display_name, source) VALUES " "('lp_OLD','lp','Old LP Row','entity_resolution')") c.commit() c.close() def resolved_persons(db): c = sqlite3.connect(db) q = "SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND deleted_at IS NULL AND id NOT IN (?,?,?)" n = c.execute(q, SEEDED).fetchone()[0] c.close() return n def deleted_at(db, eid): c = sqlite3.connect(db) r = c.execute("SELECT deleted_at FROM canonical_entities WHERE id=?", (eid,)).fetchone() c.close() return r[0] if r else "MISSING" def grid_match_kinds(db): c = sqlite3.connect(db) rows = dict(c.execute("SELECT match_kind, COUNT(*) FROM entity_links " "WHERE source_model='fundraising_contacts' AND match_kind!='name_org' GROUP BY match_kind").fetchall()) c.close() return rows def minted_from_grid(db): """Persons minted directly from a grid row (the bug). Should be 0 after the fix.""" c = sqlite3.connect(db) n = c.execute("""SELECT COUNT(DISTINCT l.canonical_id) FROM entity_links l JOIN canonical_entities ce ON ce.id=l.canonical_id AND ce.deleted_at IS NULL WHERE l.source_model='fundraising_contacts' AND l.match_kind IN ('name_org','exact_email') AND l.canonical_id NOT IN (?,?,?)""", SEEDED).fetchone()[0] c.close() return n FAILS = [] def check(cond, msg): print((" PASS " if cond else " FAIL ") + msg) if not cond: FAILS.append(msg) def main(): tmp = tempfile.mkdtemp() db = os.path.join(tmp, "repro.db") seed(db) counts1, _ = er.run(db) print(f"Run 1 counts: {counts1}") # 4 contacts; grid rows either link back (g_alice email, g_dave name+inv, # g_link explicit id) or are skipped (g_bob, g_carol). NO grid row mints a # person -> count stays 4, not 7-9. check(resolved_persons(db) == 4, f"resolved persons == 4 (got {resolved_persons(db)}); old double-count would be 7-9") check(minted_from_grid(db) == 0, f"zero persons minted from grid rows (got {minted_from_grid(db)})") mk = grid_match_kinds(db) check(mk.get("grid_assoc", 0) == 2, f"two grid contacts matched back via grid_assoc (got {mk.get('grid_assoc',0)})") check(mk.get("grid_link", 0) == 1, f"one grid contact linked via explicit contact_id (grid_link==1, got {mk.get('grid_link',0)})") # Targeted cleanup: stale grid-only twin + superseded 'lp' row tombstoned... check(deleted_at(db, "per_TWIN") is not None, "stale grid-only twin 'per_TWIN' tombstoned") check(deleted_at(db, "lp_OLD") is not None, "superseded 'lp' row 'lp_OLD' tombstoned") # ...enriched grid-only person preserved. check(deleted_at(db, "per_ENR") is None, "enriched grid-only person 'per_ENR' PRESERVED (has segment)") check(counts1.get("pruned_stale", 0) == 2, f"exactly 2 stale rows pruned (got {counts1.get('pruned_stale')})") # Un-tombstone: soft-delete a real contact-person, then re-run -> it comes back. alice = er._eid("per", "e|alice@x.com") cc = sqlite3.connect(db) cc.execute("UPDATE canonical_entities SET deleted_at='2026-01-01' WHERE id=?", (alice,)) cc.commit() cc.close() counts2, _ = er.run(db) print(f"Run 2 counts: {counts2}") check(deleted_at(db, alice) is None, "re-emitted contact-person is UN-tombstoned (no permanent burial)") check(resolved_persons(db) == 4, f"resolved persons stable at 4 on re-run (got {resolved_persons(db)})") check(counts2.get("pruned_stale", 0) == 0, f"nothing re-pruned on idempotent re-run (got {counts2.get('pruned_stale')})") print() if FAILS: print(f"FAILED ({len(FAILS)}):") for f in FAILS: print(f" - {f}") sys.exit(1) print("ALL PASS") if __name__ == "__main__": main()