d16264f401
Root cause: grid contacts (fundraising_contacts) are the SAME people as the contacts table (the app syncs them by name/email), but resolution matched grid rows by (name + investor-canon) where the two sides derive the investor key from different tables that rarely line up — so nearly every grid contact minted a duplicate person (715 + ~692 ≈ 1406), and the duplicate finder then flagged each twin against its real self (~676 candidates). Fix (entity_resolution.py): - Grid pass matches a grid contact to its existing contacts-table person by PROVABLE keys only (exact email, else exact name within the same investor) and records membership; on a miss it MINTS NOTHING (the old else-branch mint was the double-count source, and guessing by name across firms risks binding two different same-named people). - Targeted, audited cleanup soft-deletes leftover grid-only "twins" (person rows with no 'contacts' link) and superseded pre-:48 'lp'/'organization' rows, guarded so any row carrying enrichment/human data is never dropped (guardrail #3); the tombstoned ids are logged to interaction_log (guardrail #5). - _upsert_entity clears deleted_at on conflict so a re-emitted id is un-tombstoned (no permanent burial); fuzzy-merge losers stay buried via _redirect. entity_merge.py / server.py: the duplicate queue + pending count now filter to candidates whose both sides are still live, so self-healed twins drop out. Verified: offline reproduction test (backend/ingest/test_entity_resolution.py, 10/10) reproduces the 1406-style doubling and proves it collapses; no regression on the synthetic dev set; two adversarial review passes. Known pre-existing identity-key weaknesses (same name+firm+no email collision; shared role inbox over-link) are unchanged by this fix and will be resolved structurally by the contact_id link in the grid/contacts unification. Run "Build search index" after upgrading to recompute the canonical layer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
88 lines
4.3 KiB
Python
88 lines
4.3 KiB
Python
"""Human-decided entity-merge candidates (Phase 1). Called by server.py routes —
|
|
NOT an agent tool. A partner approves (merge) or rejects (keep separate) each
|
|
candidate the fuzzy tier surfaced. Approvals apply the merge and record it in
|
|
entity_merges (durable, so deterministic resolution respects it); rejections are
|
|
remembered so the pair is not re-surfaced. Everything is logged.
|
|
"""
|
|
import json
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def _now():
|
|
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
|
|
|
|
|
|
def _conn(db):
|
|
c = sqlite3.connect(db)
|
|
c.row_factory = sqlite3.Row
|
|
c.execute("PRAGMA foreign_keys=ON")
|
|
return c
|
|
|
|
|
|
def _log(c, actor_id, action, target_id, payload):
|
|
c.execute("""INSERT INTO interaction_log
|
|
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
|
(str(uuid.uuid4()), _now(), "human", actor_id, action, "canonical_entity", target_id,
|
|
json.dumps(payload) if payload is not None else None, "crm_ui", _now()))
|
|
|
|
|
|
def list_candidates(db, status="pending"):
|
|
c = _conn(db)
|
|
# Only surface candidates whose BOTH sides are still live canonical entities.
|
|
# When entity resolution self-heals (a grid "twin" matched back, so its
|
|
# duplicate entity is soft-deleted), the candidate that paired them becomes
|
|
# moot — hide it instead of asking a human to adjudicate a tombstone.
|
|
rows = [dict(r) for r in c.execute(
|
|
"""SELECT mc.* FROM entity_merge_candidates mc
|
|
JOIN canonical_entities a ON a.id = mc.entity_a AND a.deleted_at IS NULL
|
|
JOIN canonical_entities b ON b.id = mc.entity_b AND b.deleted_at IS NULL
|
|
WHERE mc.status=?
|
|
ORDER BY mc.confidence DESC, mc.created_at DESC""", (status,))]
|
|
c.close()
|
|
return {"candidates": rows, "count": len(rows)}
|
|
|
|
|
|
def _apply_merge(c, survivor, loser, confidence, reason):
|
|
"""Merge `loser` into `survivor`: record durably, repoint links + relationship
|
|
edges, soft-delete the loser (never hard-delete — guardrail #3)."""
|
|
c.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at)
|
|
VALUES (?,?,?,?,?)
|
|
ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id,
|
|
confidence=excluded.confidence, reason=excluded.reason""",
|
|
(loser, survivor, confidence or 0.8, reason, _now()))
|
|
c.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge' WHERE canonical_id=?",
|
|
(survivor, loser))
|
|
# repoint relationship edges (member_of, etc.); OR IGNORE avoids unique clashes,
|
|
# then drop any leftover edges the survivor already had.
|
|
c.execute("UPDATE OR IGNORE relationship_edges SET src_id=? WHERE src_id=?", (survivor, loser))
|
|
c.execute("UPDATE OR IGNORE relationship_edges SET dst_id=? WHERE dst_id=?", (survivor, loser))
|
|
c.execute("DELETE FROM relationship_edges WHERE src_id=? OR dst_id=?", (loser, loser))
|
|
c.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), loser))
|
|
|
|
|
|
def decide(db, candidate_id, decision, decided_by):
|
|
if decision not in ("approve", "reject"):
|
|
return {"error": "bad_decision", "allowed": ["approve", "reject"]}
|
|
c = _conn(db)
|
|
cand = c.execute("SELECT * FROM entity_merge_candidates WHERE id=?", (candidate_id,)).fetchone()
|
|
if not cand:
|
|
c.close()
|
|
return {"error": "not_found", "candidate_id": candidate_id}
|
|
if cand["status"] != "pending":
|
|
c.close()
|
|
return {"error": "already_decided", "status": cand["status"]}
|
|
|
|
if decision == "approve":
|
|
_apply_merge(c, cand["entity_a"], cand["entity_b"], cand["confidence"], cand["reason"])
|
|
c.execute("UPDATE entity_merge_candidates SET status=?, decided_by=?, decided_at=? WHERE id=?",
|
|
("approved" if decision == "approve" else "rejected", decided_by, _now(), candidate_id))
|
|
_log(c, decided_by, f"entity.merge_{decision}d", cand["entity_a"],
|
|
{"survivor": cand["entity_a"], "loser": cand["entity_b"], "names": [cand["name_a"], cand["name_b"]]})
|
|
c.commit()
|
|
c.close()
|
|
return {"id": candidate_id, "decision": decision,
|
|
"merged": (cand["entity_b"] if decision == "approve" else None)}
|