diff --git a/backend/ingest/entity_resolution.py b/backend/ingest/entity_resolution.py index 4690bee..920e154 100644 --- a/backend/ingest/entity_resolution.py +++ b/backend/ingest/entity_resolution.py @@ -143,11 +143,12 @@ def resolve_organizations(conn, merge_map=None): org_canon_by_orgid, org_canon_by_fundinv = {}, {} for key, g in groups.items(): - # An org we are actively raising from (has a fundraising row) is an 'lp'; - # otherwise a plain 'organization'. - kind = "lp" if g["investors"] else "organization" - cid = _redirect(merge_map, _eid("lp" if kind == "lp" else "org", key)) - _upsert_entity(conn, cid, kind, g["name"], g["email"]) + # Every firm group is one INVESTOR entity. The fundraising grid is the + # source of truth for investor entities (each row = one investor, whether + # an institution/family-office or an individual); the organizations table + # mirrors those names. So we no longer split into lp/organization. + cid = _redirect(merge_map, _eid("inv", key)) + _upsert_entity(conn, cid, "investor", g["name"], g["email"]) for oid in g["orgs"]: _link(conn, cid, "organizations", oid, key, "exact_name", 1.0) org_canon_by_orgid[oid] = cid @@ -158,57 +159,77 @@ def resolve_organizations(conn, merge_map=None): return org_canon_by_orgid, org_canon_by_fundinv +def _member_of(conn, person_id, investor_id): + """Record that a person (contact) belongs to an investor entity.""" + if not investor_id or person_id == investor_id: + return + conn.execute(""" + INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed, + first_seen_at, last_seen_at, created_at, updated_at) + VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?) + ON CONFLICT(src_id, dst_id, edge_type, source) + DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at + """, (str(uuid.uuid4()), person_id, investor_id, _now(), _now(), _now(), _now())) + + def resolve_people(conn, org_canon_by_orgid, org_canon_by_fundinv, merge_map=None): - """Merge contacts + fundraising_contacts by exact email, else exact name within - the same canonical org. Returns contact_id -> person canonical id (for lp_profiles).""" + """People come from the CONTACTS table (one person per contact, where the + emails/LinkedIn live). The fundraising grid's contacts are NOT a second set of + people — each is matched to a contact-person and recorded only as a member_of + edge to its investor entity (the grid's 'Contacts' column says who belongs to + which investor). This is what stops the double-count. + Returns contact_id -> person canonical id (for lp_profiles).""" merge_map = merge_map or {} - # gather (model, source_id, full_name, email, org_canon) - people = [] - for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts"): - full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip() - people.append(("contacts", r["id"], full, norm_email(r["email"]), - org_canon_by_orgid.get(r["organization_id"]))) - for r in conn.execute("SELECT id, full_name, email, investor_id FROM fundraising_contacts"): - people.append(("fundraising_contacts", r["id"], r["full_name"] or "", norm_email(r["email"]), - org_canon_by_fundinv.get(r["investor_id"]))) - contact_to_person = {} - person_meta = {} # canonical_id -> {"org": org_canon, "last": last_norm, "name": display, "email": email} + person_meta = {} + by_email = {} # norm_email -> person cid + by_name_inv = {} # (name_norm, investor_canon) -> person cid - for model, sid, full, email, org_canon in people: + def _person(full, email, inv_canon, model, sid): name_norm = norm_text(full) if email: - key = f"e|{email}" - match_kind, conf, match_value = "exact_email", 1.0, email + key, mk, conf, mv = f"e|{email}", "exact_email", 1.0, email elif name_norm: - key = f"n|{name_norm}|{org_canon or ''}" - match_kind, conf, match_value = "name_org", 0.8, name_norm + key, mk, conf, mv = f"n|{name_norm}|{inv_canon or ''}", "name_org", 0.8, name_norm else: - continue + return None cid = _redirect(merge_map, _eid("per", key)) - display = full.strip() or email - _upsert_entity(conn, cid, "person", display, email) - _link(conn, cid, model, sid, match_value, match_kind, conf) - # Record that this contact (person) belongs to its investor/org entity, so - # one investor can own many contacts (e.g. a family office with several - # people) — and a 1-contact HNWI is just the N=1 case. - if org_canon and cid != org_canon: - conn.execute(""" - INSERT INTO relationship_edges (id, src_id, dst_id, edge_type, source, strength, directed, - first_seen_at, last_seen_at, created_at, updated_at) - VALUES (?,?,?, 'member_of', 'entity_resolution', 1.0, 1, ?, ?, ?, ?) - ON CONFLICT(src_id, dst_id, edge_type, source) - DO UPDATE SET last_seen_at=excluded.last_seen_at, updated_at=excluded.updated_at - """, (str(uuid.uuid4()), cid, org_canon, _now(), _now(), _now(), _now())) - if model == "contacts": - contact_to_person[sid] = cid - meta = person_meta.setdefault(cid, {"org": org_canon, "last": _split_name(full)[1], - "name": display, "email": email}) - if org_canon and not meta["org"]: - meta["org"] = org_canon + _upsert_entity(conn, cid, "person", full.strip() or email, email) + _link(conn, cid, model, sid, mv, mk, conf) + if email: + by_email[email] = cid + if name_norm: + by_name_inv[(name_norm, inv_canon or "")] = cid + _member_of(conn, cid, inv_canon) + m = person_meta.setdefault(cid, {"org": inv_canon, "last": _split_name(full)[1], + "name": full.strip() or email, "email": email}) + if inv_canon and not m["org"]: + m["org"] = inv_canon + return cid + + # 1. People = the contacts table. + for r in conn.execute("SELECT id, first_name, last_name, email, organization_id FROM contacts WHERE deleted_at IS NULL"): + full = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip() + cid = _person(full, norm_email(r["email"]), org_canon_by_orgid.get(r["organization_id"]), "contacts", r["id"]) + if cid: + contact_to_person[r["id"]] = cid + + # 2. Grid contacts are associations, not new people: match to a contact-person + # (by email, else name within the same investor) and just add membership. + # Only create a person when there is genuinely no matching contact. + for r in conn.execute("SELECT id, full_name, email, investor_id FROM fundraising_contacts"): + email = norm_email(r["email"]) + name_norm = norm_text(r["full_name"] or "") + inv_canon = org_canon_by_fundinv.get(r["investor_id"]) + cid = (by_email.get(email) if email else None) or by_name_inv.get((name_norm, inv_canon or "")) + if cid: + _link(conn, cid, "fundraising_contacts", r["id"], email or name_norm, "grid_assoc", 0.9) + _member_of(conn, cid, inv_canon) + else: + _person(r["full_name"] or "", email, inv_canon, "fundraising_contacts", r["id"]) # lp_profiles -> the person entity of its contact - for r in conn.execute("SELECT id, contact_id FROM lp_profiles"): + for r in conn.execute("SELECT id, contact_id FROM lp_profiles WHERE deleted_at IS NULL"): cid = contact_to_person.get(r["contact_id"]) if cid: _link(conn, cid, "lp_profiles", r["id"], r["contact_id"], "contact_fk", 1.0) @@ -256,8 +277,7 @@ def run(db_path: str): live = "deleted_at IS NULL" counts = { "canonical_total": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE {live}").fetchone()[0], - "lp": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='lp' AND {live}").fetchone()[0], - "organization": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='organization' AND {live}").fetchone()[0], + "investor": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='investor' AND {live}").fetchone()[0], "person": conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0], "links": conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0], "fuzzy_candidates": len(candidates), diff --git a/backend/server.py b/backend/server.py index 3cfe441..9dc3d75 100644 --- a/backend/server.py +++ b/backend/server.py @@ -3467,8 +3467,7 @@ class CRMHandler(BaseHTTPRequestHandler): try: live = "deleted_at IS NULL" out['canonical_entities'] = { - 'lp': conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='lp' AND {live}").fetchone()[0], - 'organization': conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='organization' AND {live}").fetchone()[0], + 'investor': conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='investor' AND {live}").fetchone()[0], 'person': conn.execute(f"SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND {live}").fetchone()[0], } out['entity_links'] = conn.execute("SELECT COUNT(*) FROM entity_links").fetchone()[0] diff --git a/frontend/index.html b/frontend/index.html index 9edf578..2d87a26 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -9382,20 +9382,20 @@