c7ce44d963
Workstream A–C substrate for the Ten31 agentic system: - A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9 - A2: additive/reversible core migration (canonical_entities, entity_links, interaction_log, relationship_edges, soft-delete) + ledgered runner - B1/B3: chunking + deterministic entity resolution (backend/ingest) - B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks - C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools - docs: redaction/re-hydration, Gmail enablement runbook - synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db, drop legacy files + start9/0.3.5) Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity queries). Real backfill runs on Ten31 infra; index holds synthetic data only. Branch snapshot also captures pre-existing working-tree changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
216 lines
7.3 KiB
Python
216 lines
7.3 KiB
Python
"""
|
|
Investor matching.
|
|
|
|
Builds an in-memory index of investor email addresses from:
|
|
- fundraising_contacts.email
|
|
- contacts.email
|
|
- organizations.email + organizations.website (domain only)
|
|
|
|
For each synced email, returns a list of investor links. Exact-email matches
|
|
beat domain matches; if any exact match exists, domain matches are suppressed.
|
|
|
|
The index is rebuilt every `REFRESH_INTERVAL_SEC` or on demand via rebuild().
|
|
"""
|
|
|
|
import re
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
|
|
REFRESH_INTERVAL_SEC = 900 # 15 minutes
|
|
|
|
# Domains we never domain-match against (personal mailboxes).
|
|
COMMON_PERSONAL_DOMAINS = {
|
|
"gmail.com", "googlemail.com",
|
|
"outlook.com", "hotmail.com", "live.com", "msn.com",
|
|
"yahoo.com", "yahoo.co.uk", "ymail.com",
|
|
"icloud.com", "me.com", "mac.com",
|
|
"aol.com", "proton.me", "protonmail.com",
|
|
"pm.me", "fastmail.com", "tuta.io", "hey.com",
|
|
"duck.com", "zoho.com",
|
|
}
|
|
|
|
|
|
# Also skip matching on the team's own domain (they email each other).
|
|
# Populated from CONFIG.workspace_domain at rebuild time.
|
|
|
|
|
|
@dataclass
|
|
class MatchTarget:
|
|
fundraising_investor_id: Optional[str] = None
|
|
fundraising_contact_id: Optional[str] = None
|
|
contact_id: Optional[str] = None
|
|
organization_id: Optional[str] = None
|
|
investor_name: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class InvestorLink:
|
|
matched_address: str
|
|
match_kind: str # exact_email | domain_match | manual
|
|
match_confidence: float
|
|
target: MatchTarget
|
|
|
|
|
|
class InvestorIndex:
|
|
|
|
def __init__(self, own_domain: Optional[str] = None):
|
|
self._email_index: dict[str, MatchTarget] = {}
|
|
self._domain_index: dict[str, list[MatchTarget]] = {}
|
|
self._own_domain = (own_domain or "").lower() or None
|
|
self._last_built = 0.0
|
|
self._lock = threading.Lock()
|
|
|
|
# ------------------------------------------------------------------ build
|
|
|
|
def rebuild(self, db_conn_factory) -> None:
|
|
with self._lock:
|
|
email_idx: dict[str, MatchTarget] = {}
|
|
domain_idx: dict[str, list[MatchTarget]] = {}
|
|
|
|
conn = db_conn_factory()
|
|
try:
|
|
cur = conn.cursor()
|
|
|
|
# fundraising_contacts
|
|
cur.execute(
|
|
"SELECT fc.id, fc.email, fc.investor_id, fi.investor_name "
|
|
"FROM fundraising_contacts fc "
|
|
"LEFT JOIN fundraising_investors fi ON fi.id = fc.investor_id "
|
|
"WHERE fc.email IS NOT NULL AND fc.email != ''"
|
|
)
|
|
for r in cur.fetchall():
|
|
addr = (r["email"] or "").lower().strip()
|
|
if not _valid_email(addr):
|
|
continue
|
|
email_idx[addr] = MatchTarget(
|
|
fundraising_contact_id=r["id"],
|
|
fundraising_investor_id=r["investor_id"],
|
|
investor_name=r["investor_name"],
|
|
)
|
|
|
|
# contacts
|
|
cur.execute(
|
|
"SELECT id, email, organization_id FROM contacts "
|
|
"WHERE email IS NOT NULL AND email != ''"
|
|
)
|
|
for r in cur.fetchall():
|
|
addr = (r["email"] or "").lower().strip()
|
|
if not _valid_email(addr):
|
|
continue
|
|
# Don't overwrite a fundraising_contact match; they're higher signal.
|
|
email_idx.setdefault(addr, MatchTarget(
|
|
contact_id=r["id"],
|
|
organization_id=r["organization_id"],
|
|
))
|
|
|
|
# organizations — domain-only match source
|
|
cur.execute(
|
|
"SELECT id, name, email, website FROM organizations "
|
|
"WHERE (email IS NOT NULL AND email != '') OR (website IS NOT NULL AND website != '')"
|
|
)
|
|
for r in cur.fetchall():
|
|
for d in _domains_for_org(r):
|
|
if d in COMMON_PERSONAL_DOMAINS:
|
|
continue
|
|
if self._own_domain and d == self._own_domain:
|
|
continue
|
|
domain_idx.setdefault(d, []).append(MatchTarget(
|
|
organization_id=r["id"],
|
|
investor_name=r["name"],
|
|
))
|
|
finally:
|
|
conn.close()
|
|
|
|
self._email_index = email_idx
|
|
self._domain_index = domain_idx
|
|
self._last_built = time.time()
|
|
|
|
def rebuild_if_stale(self, db_conn_factory) -> None:
|
|
if time.time() - self._last_built > REFRESH_INTERVAL_SEC:
|
|
self.rebuild(db_conn_factory)
|
|
|
|
# ------------------------------------------------------------------ query
|
|
|
|
def match(self, addresses: set[str], *,
|
|
exclude_addresses: Optional[set[str]] = None) -> list[InvestorLink]:
|
|
excl = {a.lower() for a in (exclude_addresses or set())}
|
|
candidates = {a.lower().strip() for a in addresses if a} - excl
|
|
|
|
# Exclude own domain addresses (teammates emailing each other).
|
|
if self._own_domain:
|
|
candidates = {a for a in candidates
|
|
if not a.endswith("@" + self._own_domain)}
|
|
|
|
links: list[InvestorLink] = []
|
|
seen_targets: set[tuple] = set()
|
|
|
|
# Exact email matches first.
|
|
for addr in candidates:
|
|
t = self._email_index.get(addr)
|
|
if t:
|
|
key = (t.fundraising_contact_id, t.contact_id)
|
|
if key in seen_targets:
|
|
continue
|
|
seen_targets.add(key)
|
|
links.append(InvestorLink(
|
|
matched_address=addr,
|
|
match_kind="exact_email",
|
|
match_confidence=1.0,
|
|
target=t,
|
|
))
|
|
|
|
if links: # exact hits short-circuit domain matching
|
|
return links
|
|
|
|
# Domain fallback.
|
|
for addr in candidates:
|
|
_, _, domain = addr.partition("@")
|
|
if not domain or domain in COMMON_PERSONAL_DOMAINS:
|
|
continue
|
|
for t in self._domain_index.get(domain, []):
|
|
key = ("org", t.organization_id)
|
|
if key in seen_targets:
|
|
continue
|
|
seen_targets.add(key)
|
|
links.append(InvestorLink(
|
|
matched_address=addr,
|
|
match_kind="domain_match",
|
|
match_confidence=0.6,
|
|
target=t,
|
|
))
|
|
return links
|
|
|
|
|
|
# ---------------------------------------------------------------------------- helpers
|
|
|
|
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
|
|
|
|
def _valid_email(s: str) -> bool:
|
|
return bool(_EMAIL_RE.match(s))
|
|
|
|
|
|
def _domains_for_org(row) -> list[str]:
|
|
out: list[str] = []
|
|
if row["email"]:
|
|
_, _, d = row["email"].lower().partition("@")
|
|
if d:
|
|
out.append(d)
|
|
if row["website"]:
|
|
d = _domain_from_url(row["website"])
|
|
if d:
|
|
out.append(d)
|
|
return list({d for d in out if d})
|
|
|
|
|
|
def _domain_from_url(url: str) -> Optional[str]:
|
|
if not url:
|
|
return None
|
|
m = re.match(r"^\s*(?:https?://)?(?:www\.)?([^/:?#\s]+)", url.strip(), re.IGNORECASE)
|
|
if not m:
|
|
return None
|
|
return m.group(1).lower()
|