Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server

Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Keysat
2026-06-05 08:11:28 -05:00
parent 7027efd777
commit c7ce44d963
99 changed files with 10676 additions and 7817 deletions
+390
View File
@@ -0,0 +1,390 @@
"""
Sync orchestrator.
Top-level entry points:
sync_account(conn_factory, credential_provider, account_row, matcher)
Full sync pass for one mailbox. Decides backfill vs. incremental based
on email_accounts.backfill_complete. Writes a sync_runs row.
sync_all(conn_factory, credential_provider, matcher)
Iterates every sync-enabled account sequentially. Called from
scheduler.py every CRM_GMAIL_SYNC_INTERVAL_MIN minutes.
Design: match-only storage (see architecture doc §7). For each message:
1. Fetch metadata (cheap, 5 units).
2. Run matcher against participant addresses.
3. If matched → fetch full message, parse, persist body + register attachments.
4. If unmatched → persist header-only row.
5. In both cases, record the per-account sighting.
"""
import logging
import sqlite3
import traceback
from typing import Optional
from . import attachments as _attach
from . import config as _cfg
from . import db as _db
from . import errors as _errors
from . import gmail_client as _gmail
from . import parser as _parser
from . import threads as _threads
from .matcher import InvestorIndex, InvestorLink
log = logging.getLogger("email_integration.sync")
METADATA_HEADERS = [
"From", "To", "Cc", "Bcc", "Subject", "Date",
"Message-ID", "In-Reply-To", "References", "Reply-To",
]
# ---------------------------------------------------------------------------- public
def sync_all(conn_factory, credential_provider, index: InvestorIndex) -> dict:
"""Run one pass across all enabled accounts. Returns summary stats."""
index.rebuild_if_stale(conn_factory)
conn = conn_factory()
try:
accounts = _db.list_sync_ready_accounts(conn)
finally:
conn.close()
totals = {"accounts": 0, "messages_stored": 0, "errors": 0}
for acc in accounts:
totals["accounts"] += 1
try:
stats = sync_account(conn_factory, credential_provider, acc, index)
totals["messages_stored"] += stats.get("messages_stored", 0)
except Exception:
totals["errors"] += 1
log.exception("sync failed for account %s", acc["email_address"])
return totals
def sync_account(conn_factory, credential_provider, account,
index: InvestorIndex) -> dict:
"""Sync a single mailbox. Returns stats dict."""
email_addr = account["email_address"]
stats = _gmail.CallStats()
client = _gmail.GmailClient(credential_provider, email_addr, stats=stats)
# Mark running
conn = conn_factory()
try:
run_id = _db.start_sync_run(conn,
account_id=account["id"],
kind="backfill" if not account["backfill_complete"] else "incremental")
_db.set_account_status(conn, account["id"], status="active", error=None)
conn.commit()
finally:
conn.close()
run_stats = {"messages_seen": 0, "messages_stored": 0, "attachments_saved": 0}
error_str: Optional[str] = None
status = "ok"
try:
if not account["backfill_complete"]:
_run_backfill(conn_factory, client, account, index, run_stats)
else:
_run_incremental(conn_factory, client, account, index, run_stats)
# Drain attachments for this account.
conn = conn_factory()
try:
# Limit to a few cycles' worth of attachments per pass.
batched = _attach.drain_pending(conn_factory, client, account["id"], limit=100)
run_stats["attachments_saved"] = batched
finally:
conn.close()
except _errors.AuthError as e:
error_str = f"auth: {e}"
status = "error"
except _errors.HistoryExpiredError:
# Recover: reset to date-based backfill from last_synced_at.
error_str = "history expired; fallback to date backfill"
status = "partial"
_fallback_date_backfill(conn_factory, client, account, index, run_stats)
except Exception as e:
error_str = f"unexpected: {type(e).__name__}: {e}"
status = "error"
log.exception("unexpected during sync of %s", email_addr)
finally:
run_stats["api_calls"] = stats.api_calls
run_stats["retries"] = stats.retries
conn = conn_factory()
try:
_db.finish_sync_run(conn, run_id, status=status, stats=run_stats, error=error_str)
_db.set_account_status(conn, account["id"],
status="active" if status == "ok" else status,
error=error_str)
_db.set_account_checkpoint(conn, account["id"],
last_synced_at=_db._now_iso())
conn.commit()
finally:
conn.close()
return run_stats
# ---------------------------------------------------------------------------- backfill
def _run_backfill(conn_factory, client, account, index: InvestorIndex,
run_stats: dict) -> None:
"""Initial full-mailbox backfill, resumable via backfill_cursor."""
page_token = account["backfill_cursor"]
while True:
resp = client.list_messages(page_token=page_token,
max_results=_cfg.CONFIG.backfill_page_size)
messages = resp.get("messages") or []
for m in messages:
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=m["id"], run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip msg %s on %s: %s", m["id"], account["email_address"], e)
continue
page_token = resp.get("nextPageToken")
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"],
backfill_cursor=page_token,
backfill_complete=(not page_token))
conn.commit()
finally:
conn.close()
if not page_token:
# Capture current historyId as checkpoint for future incrementals.
prof = client.get_profile()
hid = prof.get("historyId")
if hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
conn.commit()
finally:
conn.close()
return
# ---------------------------------------------------------------------------- incremental
def _run_incremental(conn_factory, client, account, index: InvestorIndex,
run_stats: dict) -> None:
start_hid = account["last_history_id"]
if not start_hid:
# Safety: if checkpoint is missing, re-enter backfill.
_run_backfill(conn_factory, client, account, index, run_stats)
return
# history_types filter limits bandwidth to what we care about.
new_hid: Optional[str] = None
try:
for h in client.iter_history(
start_history_id=start_hid,
history_types=["messageAdded", "messageDeleted", "labelAdded", "labelRemoved"],
):
for ma in h.get("messagesAdded") or []:
msg = ma.get("message") or {}
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=msg.get("id"),
run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip msg %s on %s: %s", msg.get("id"), account["email_address"], e)
for md in h.get("messagesDeleted") or []:
msg = md.get("message") or {}
conn = conn_factory()
try:
_db.tombstone_sighting(
conn,
account_id=account["id"],
gmail_message_id=msg.get("id"),
)
conn.commit()
finally:
conn.close()
for la in (h.get("labelsAdded") or []) + (h.get("labelsRemoved") or []):
msg = la.get("message") or {}
# labels are the resulting label set in Gmail's payload after
# the change. We refresh them wholesale.
labels = msg.get("labelIds") or []
conn = conn_factory()
try:
_db.update_sighting_labels(
conn,
account_id=account["id"],
gmail_message_id=msg.get("id"),
labels=labels,
)
conn.commit()
finally:
conn.close()
new_hid = client.last_history_id
except _errors.HistoryExpiredError:
raise
if new_hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(new_hid))
conn.commit()
finally:
conn.close()
def _fallback_date_backfill(conn_factory, client, account, index, run_stats):
"""Used when startHistoryId has been pruned by Gmail.
Pulls everything since last_synced_at (or 14d if unknown), which will
hit a large overlap with existing data but upserts are idempotent.
"""
from datetime import datetime, timedelta, timezone
since = account["last_synced_at"] or (
datetime.now(tz=timezone.utc) - timedelta(days=14)
).strftime("%Y-%m-%dT%H:%M:%SZ")
q = f"after:{since.replace('-', '/').split('T')[0]}"
for m in client.iter_messages(q=q):
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=m["id"], run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip during date-backfill msg %s: %s", m["id"], e)
prof = client.get_profile()
hid = prof.get("historyId")
if hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
conn.commit()
finally:
conn.close()
# ---------------------------------------------------------------------------- per-message
def _process_one_message(conn_factory, client, account, index: InvestorIndex,
*, gmail_message_id: str, run_stats: dict) -> None:
"""Fetch, match, persist one message. Idempotent."""
if not gmail_message_id:
return
# Skip if we've already sighted this message for this account.
conn = conn_factory()
try:
cur = conn.cursor()
cur.execute(
"SELECT email_id FROM email_account_messages "
"WHERE account_id = ? AND gmail_message_id = ?",
(account["id"], gmail_message_id),
)
if cur.fetchone():
return
finally:
conn.close()
# 1. Metadata fetch (cheap).
meta = client.get_message(gmail_message_id, format="metadata",
metadata_headers=METADATA_HEADERS)
meta_parsed = _parser.parse(meta, owning_account_address=account["email_address"])
participants = set()
if meta_parsed.get("from_email"):
participants.add(meta_parsed["from_email"])
for kind in ("to", "cc", "bcc"):
for a in meta_parsed.get(kind, []):
if isinstance(a, dict) and a.get("email"):
participants.add(a["email"])
# Exclude owning account's own address so we don't try to "match" ourselves.
own = {account["email_address"].lower()}
links = index.match(participants, exclude_addresses=own)
is_matched = bool(links)
# 2. If matched, fetch full and parse for body + attachments.
if is_matched:
full = client.get_message(gmail_message_id, format="full")
parsed = _parser.parse(full, owning_account_address=account["email_address"])
else:
parsed = meta_parsed
# Strip any body fields (metadata fetch shouldn't have them but be safe).
parsed["body_text"] = None
parsed["body_html"] = None
parsed["attachments"] = []
# 3. Persist (idempotent on rfc_message_id).
conn = conn_factory()
try:
existing = _db.find_email_by_rfc_id(conn, parsed["rfc_message_id"])
if existing:
email_id = existing["id"]
# If the email was previously unmatched but now matches (e.g. user
# added the investor after first sight), upgrade the row.
if is_matched and existing["match_status"] == "unmatched":
conn.execute(
"UPDATE emails SET match_status = 'matched', is_matched = 1, "
"body_text = ?, body_html = ?, updated_at = datetime('now') "
"WHERE id = ?",
(parsed.get("body_text"), parsed.get("body_html"), email_id),
)
_attach.register_stubs(conn,
email_id=email_id,
parsed_attachments=parsed.get("attachments") or [])
for link in links:
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
else:
match_status = "matched" if is_matched else "unmatched"
email_id = _db.insert_email(conn, parsed=parsed, match_status=match_status)
thread_id = _threads.resolve_thread_id(conn, parsed)
_db.set_email_thread(conn, email_id, thread_id)
if is_matched:
_attach.register_stubs(conn,
email_id=email_id,
parsed_attachments=parsed.get("attachments") or [])
for link in links:
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
_db.rollup_thread(conn, thread_id)
run_stats["messages_stored"] += 1
# Record sighting (always, even if email row was pre-existing).
_db.upsert_sighting(
conn,
email_id=email_id,
account_id=account["id"],
gmail_message_id=gmail_message_id,
gmail_thread_id=parsed.get("gmail_thread_id") or "",
labels=parsed.get("labels", []),
is_sent=parsed.get("is_sent", False),
)
conn.commit()
except sqlite3.IntegrityError:
# Concurrent insert race — re-read and proceed.
pass
finally:
conn.close()
def _flatten_link(link: InvestorLink) -> dict:
return {
"matched_address": link.matched_address,
"match_kind": link.match_kind,
"match_confidence": link.match_confidence,
"fundraising_investor_id": link.target.fundraising_investor_id,
"fundraising_contact_id": link.target.fundraising_contact_id,
"contact_id": link.target.contact_id,
"organization_id": link.target.organization_id,
}