Entity model: investors (grid) vs people (contacts); fix double-count (0.1.0:48)

Per Grant's clarification of the real data model:
- Investor entities come from the fundraising grid, one per row, all labeled
  "investor" (drops the confusing lp/organization split). Grid is source of truth.
- People come ONLY from the contacts table. The grid's contacts (fundraising_
  contacts) are matched to a contact-person and recorded as member_of links to
  their investor, instead of creating duplicate person entities. This fixes the
  ~doubled people count (people now ≈ contacts, not contacts + grid contacts).
- System Status cards: Investors / People (resolved) / Contacts in CRM / Grid
  contacts, so resolved-vs-source is visible at a glance.

Verified on synthetic: people == contacts count (no double-count); multi-contact
investors preserved via member_of.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Keysat
2026-06-05 13:05:58 -05:00
parent 3c31b1e8a5
commit 91361042e7
6 changed files with 102 additions and 62 deletions
+67 -47
View File
@@ -143,11 +143,12 @@ def resolve_organizations(conn, merge_map=None):
org_canon_by_orgid, org_canon_by_fundinv = {}, {}
for key, g in groups.items():
# An org we are actively raising from (has a fundraising row) is an 'lp';
# otherwise a plain 'organization'.
kind = "lp" if g["investors"] else "organization"
cid = _redirect(merge_map, _eid("lp" if kind == "lp" else "org", key))
_upsert_entity(conn, cid, kind, g["name"], g["email"])
# Every firm group is one INVESTOR entity. The fundraising grid is the
# source of truth for investor entities (each row = one investor, whether
# an institution/family-office or an individual); the organizations table
# mirrors those names. So we no longer split into lp/organization.
cid = _redirect(merge_map, _eid("inv", key))
_upsert_entity(conn, cid, "investor", g["name"], g["email"])
for oid in g["orgs"]:
_link(conn, cid, "organizations", oid, key, "exact_name", 1.0)
org_canon_by_orgid[oid] = cid
@@ -158,57 +159,77 @@ def resolve_organizations(conn, merge_map=None):
return org_canon_by_orgid, org_canon_by_fundinv
def _member_of(conn, person_id, investor_id):
"""Record that a person (contact) belongs to an investor entity."""
if not investor_id or person_id == investor_id:
return
conn.execute("""
INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed,
first_seen_at, last_seen_at, created_at, updated_at)
VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?)
ON CONFLICT(src_id, dst_id, edge_type, source)
DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at
""", (str(uuid.uuid4()), person_id, investor_id, _now(), _now(), _now(), _now()))
def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv, merge_map=None):
"""Merge contacts + fundraising_contacts by exact email, else exact name within
the same canonical org. Returns contact_id -> person canonical id (for lp_profiles)."""
"""People come from the CONTACTS table (one person per contact, where the
emails/LinkedIn live). The fundraising grid's contacts are NOT a second set of
people — each is matched to a contact-person and recorded only as a member_of
edge to its investor entity (the grid's 'Contacts' column says who belongs to
which investor). This is what stops the double-count.
Returns contact_id -> person canonical id (for lp_profiles)."""
merge_map = merge_map or {}
# gather (model, source_id, full_name, email, org_canon)
people = []
for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts"):
full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip()
people.append(("contacts", r["id"], full, norm_email(r["email"]),
org_canon_by_orgid.get(r["organization_id"])))
for r in conn.execute("SELECT id, full_name, email, investor_id FROM fundraising_contacts"):
people.append(("fundraising_contacts", r["id"], r["full_name"] or "", norm_email(r["email"]),
org_canon_by_fundinv.get(r["investor_id"])))
contact_to_person = {}
person_meta = {} # canonical_id -> {"org": org_canon, "last": last_norm, "name": display, "email": email}
person_meta = {}
by_email = {} # norm_email -> person cid
by_name_inv = {} # (name_norm, investor_canon) -> person cid
for model, sid, full, email, org_canon in people:
def _person(full, email, inv_canon, model, sid):
name_norm = norm_text(full)
if email:
key = f"e|{email}"
match_kind, conf, match_value = "exact_email", 1.0, email
key, mk, conf, mv = f"e|{email}", "exact_email", 1.0, email
elif name_norm:
key = f"n|{name_norm}|{org_canon or ''}"
match_kind, conf, match_value = "name_org", 0.8, name_norm
key, mk, conf, mv = f"n|{name_norm}|{inv_canon or ''}", "name_org", 0.8, name_norm
else:
continue
return None
cid = _redirect(merge_map, _eid("per", key))
display = full.strip() or email
_upsert_entity(conn, cid, "person", display, email)
_link(conn, cid, model, sid, match_value, match_kind, conf)
# Record that this contact (person) belongs to its investor/org entity, so
# one investor can own many contacts (e.g. a family office with several
# people) — and a 1-contact HNWI is just the N=1 case.
if org_canon and cid != org_canon:
conn.execute("""
INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed,
first_seen_at, last_seen_at, created_at, updated_at)
VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?)
ON CONFLICT(src_id, dst_id, edge_type, source)
DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at
""", (str(uuid.uuid4()), cid, org_canon, _now(), _now(), _now(), _now()))
if model == "contacts":
contact_to_person[sid] = cid
meta = person_meta.setdefault(cid, {"org": org_canon, "last": _split_name(full)[1],
"name": display, "email": email})
if org_canon and not meta["org"]:
meta["org"] = org_canon
_upsert_entity(conn, cid, "person", full.strip() or email, email)
_link(conn, cid, model, sid, mv, mk, conf)
if email:
by_email[email] = cid
if name_norm:
by_name_inv[(name_norm, inv_canon or "")] = cid
_member_of(conn, cid, inv_canon)
m = person_meta.setdefault(cid, {"org": inv_canon, "last": _split_name(full)[1],
"name": full.strip() or email, "email": email})
if inv_canon and not m["org"]:
m["org"] = inv_canon
return cid
# 1. People = the contacts table.
for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts WHERE deleted_at IS NULL"):
full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip()
cid = _person(full, norm_email(r["email"]), org_canon_by_orgid.get(r["organization_id"]), "contacts", r["id"])
if cid:
contact_to_person[r["id"]] = cid
# 2. Grid contacts are associations, not new people: match to a contact-person
# (by email, else name within the same investor) and just add membership.
# Only create a person when there is genuinely no matching contact.
for r in conn.execute("SELECT id, full_name, email, investor_id FROM fundraising_contacts"):
email = norm_email(r["email"])
name_norm = norm_text(r["full_name"] or "")
inv_canon = org_canon_by_fundinv.get(r["investor_id"])
cid = (by_email.get(email) if email else None) or by_name_inv.get((name_norm, inv_canon or ""))
if cid:
_link(conn, cid, "fundraising_contacts", r["id"], email or name_norm, "grid_assoc", 0.9)
_member_of(conn, cid, inv_canon)
else:
_person(r["full_name"] or "", email, inv_canon, "fundraising_contacts", r["id"])
# lp_profiles -> the person entity of its contact
for r in conn.execute("SELECT id, contact_id FROM lp_profiles"):
for r in conn.execute("SELECT id, contact_id FROM lp_profiles WHERE deleted_at IS NULL"):
cid = contact_to_person.get(r["contact_id"])
if cid:
_link(conn, cid, "lp_profiles", r["id"], r["contact_id"], "contact_fk", 1.0)
@@ -256,8 +277,7 @@ def run(db_path: str):
live = "deleted_at IS NULL"
counts = {
"canonical_total": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE {live}").fetchone()[0],
"lp": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='lp' AND {live}").fetchone()[0],
"organization": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='organization' AND {live}").fetchone()[0],
"investor": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='investor' AND {live}").fetchone()[0],
"person": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0],
"links": conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0],
"fuzzy_candidates": len(candidates),