Files
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

216 lines
7.3 KiB
Python

"""
Investor matching.
Builds an in-memory index of investor email addresses from:
- fundraising_contacts.email
- contacts.email
- organizations.email + organizations.website (domain only)
For each synced email, returns a list of investor links. Exact-email matches
beat domain matches; if any exact match exists, domain matches are suppressed.
The index is rebuilt every `REFRESH_INTERVAL_SEC` or on demand via rebuild().
"""
import re
import threading
import time
from dataclasses import dataclass
from typing import Optional
REFRESH_INTERVAL_SEC = 900 # 15 minutes
# Domains we never domain-match against (personal mailboxes).
COMMON_PERSONAL_DOMAINS = {
"gmail.com", "googlemail.com",
"outlook.com", "hotmail.com", "live.com", "msn.com",
"yahoo.com", "yahoo.co.uk", "ymail.com",
"icloud.com", "me.com", "mac.com",
"aol.com", "proton.me", "protonmail.com",
"pm.me", "fastmail.com", "tuta.io", "hey.com",
"duck.com", "zoho.com",
}
# Also skip matching on the team's own domain (they email each other).
# Populated from CONFIG.workspace_domain at rebuild time.
@dataclass
class MatchTarget:
fundraising_investor_id: Optional[str] = None
fundraising_contact_id: Optional[str] = None
contact_id: Optional[str] = None
organization_id: Optional[str] = None
investor_name: Optional[str] = None
@dataclass
class InvestorLink:
matched_address: str
match_kind: str # exact_email | domain_match | manual
match_confidence: float
target: MatchTarget
class InvestorIndex:
def __init__(self, own_domain: Optional[str] = None):
self._email_index: dict[str, MatchTarget] = {}
self._domain_index: dict[str, list[MatchTarget]] = {}
self._own_domain = (own_domain or "").lower() or None
self._last_built = 0.0
self._lock = threading.Lock()
# ------------------------------------------------------------------ build
def rebuild(self, db_conn_factory) -> None:
with self._lock:
email_idx: dict[str, MatchTarget] = {}
domain_idx: dict[str, list[MatchTarget]] = {}
conn = db_conn_factory()
try:
cur = conn.cursor()
# fundraising_contacts
cur.execute(
"SELECT fc.id, fc.email, fc.investor_id, fi.investor_name "
"FROM fundraising_contacts fc "
"LEFT JOIN fundraising_investors fi ON fi.id = fc.investor_id "
"WHERE fc.email IS NOT NULL AND fc.email != ''"
)
for r in cur.fetchall():
addr = (r["email"] or "").lower().strip()
if not _valid_email(addr):
continue
email_idx[addr] = MatchTarget(
fundraising_contact_id=r["id"],
fundraising_investor_id=r["investor_id"],
investor_name=r["investor_name"],
)
# contacts
cur.execute(
"SELECT id, email, organization_id FROM contacts "
"WHERE email IS NOT NULL AND email != ''"
)
for r in cur.fetchall():
addr = (r["email"] or "").lower().strip()
if not _valid_email(addr):
continue
# Don't overwrite a fundraising_contact match; they're higher signal.
email_idx.setdefault(addr, MatchTarget(
contact_id=r["id"],
organization_id=r["organization_id"],
))
# organizations — domain-only match source
cur.execute(
"SELECT id, name, email, website FROM organizations "
"WHERE (email IS NOT NULL AND email != '') OR (website IS NOT NULL AND website != '')"
)
for r in cur.fetchall():
for d in _domains_for_org(r):
if d in COMMON_PERSONAL_DOMAINS:
continue
if self._own_domain and d == self._own_domain:
continue
domain_idx.setdefault(d, []).append(MatchTarget(
organization_id=r["id"],
investor_name=r["name"],
))
finally:
conn.close()
self._email_index = email_idx
self._domain_index = domain_idx
self._last_built = time.time()
def rebuild_if_stale(self, db_conn_factory) -> None:
if time.time() - self._last_built > REFRESH_INTERVAL_SEC:
self.rebuild(db_conn_factory)
# ------------------------------------------------------------------ query
def match(self, addresses: set[str], *,
exclude_addresses: Optional[set[str]] = None) -> list[InvestorLink]:
excl = {a.lower() for a in (exclude_addresses or set())}
candidates = {a.lower().strip() for a in addresses if a} - excl
# Exclude own domain addresses (teammates emailing each other).
if self._own_domain:
candidates = {a for a in candidates
if not a.endswith("@" + self._own_domain)}
links: list[InvestorLink] = []
seen_targets: set[tuple] = set()
# Exact email matches first.
for addr in candidates:
t = self._email_index.get(addr)
if t:
key = (t.fundraising_contact_id, t.contact_id)
if key in seen_targets:
continue
seen_targets.add(key)
links.append(InvestorLink(
matched_address=addr,
match_kind="exact_email",
match_confidence=1.0,
target=t,
))
if links: # exact hits short-circuit domain matching
return links
# Domain fallback.
for addr in candidates:
_, _, domain = addr.partition("@")
if not domain or domain in COMMON_PERSONAL_DOMAINS:
continue
for t in self._domain_index.get(domain, []):
key = ("org", t.organization_id)
if key in seen_targets:
continue
seen_targets.add(key)
links.append(InvestorLink(
matched_address=addr,
match_kind="domain_match",
match_confidence=0.6,
target=t,
))
return links
# ---------------------------------------------------------------------------- helpers
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _valid_email(s: str) -> bool:
return bool(_EMAIL_RE.match(s))
def _domains_for_org(row) -> list[str]:
out: list[str] = []
if row["email"]:
_, _, d = row["email"].lower().partition("@")
if d:
out.append(d)
if row["website"]:
d = _domain_from_url(row["website"])
if d:
out.append(d)
return list({d for d in out if d})
def _domain_from_url(url: str) -> Optional[str]:
if not url:
return None
m = re.match(r"^\s*(?:https?://)?(?:www\.)?([^/:?#\s]+)", url.strip(), re.IGNORECASE)
if not m:
return None
return m.group(1).lower()