c7ce44d963
Workstream A–C substrate for the Ten31 agentic system: - A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9 - A2: additive/reversible core migration (canonical_entities, entity_links, interaction_log, relationship_edges, soft-delete) + ledgered runner - B1/B3: chunking + deterministic entity resolution (backend/ingest) - B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks - C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools - docs: redaction/re-hydration, Gmail enablement runbook - synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db, drop legacy files + start9/0.3.5) Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity queries). Real backfill runs on Ten31 infra; index holds synthetic data only. Branch snapshot also captures pre-existing working-tree changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
391 lines
15 KiB
Python
391 lines
15 KiB
Python
"""
|
|
Sync orchestrator.
|
|
|
|
Top-level entry points:
|
|
|
|
sync_account(conn_factory, credential_provider, account_row, matcher)
|
|
Full sync pass for one mailbox. Decides backfill vs. incremental based
|
|
on email_accounts.backfill_complete. Writes a sync_runs row.
|
|
|
|
sync_all(conn_factory, credential_provider, matcher)
|
|
Iterates every sync-enabled account sequentially. Called from
|
|
scheduler.py every CRM_GMAIL_SYNC_INTERVAL_MIN minutes.
|
|
|
|
Design: match-only storage (see architecture doc §7). For each message:
|
|
1. Fetch metadata (cheap, 5 units).
|
|
2. Run matcher against participant addresses.
|
|
3. If matched → fetch full message, parse, persist body + register attachments.
|
|
4. If unmatched → persist header-only row.
|
|
5. In both cases, record the per-account sighting.
|
|
"""
|
|
|
|
import logging
|
|
import sqlite3
|
|
import traceback
|
|
from typing import Optional
|
|
|
|
from . import attachments as _attach
|
|
from . import config as _cfg
|
|
from . import db as _db
|
|
from . import errors as _errors
|
|
from . import gmail_client as _gmail
|
|
from . import parser as _parser
|
|
from . import threads as _threads
|
|
from .matcher import InvestorIndex, InvestorLink
|
|
|
|
|
|
log = logging.getLogger("email_integration.sync")
|
|
|
|
|
|
METADATA_HEADERS = [
|
|
"From", "To", "Cc", "Bcc", "Subject", "Date",
|
|
"Message-ID", "In-Reply-To", "References", "Reply-To",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------- public
|
|
|
|
def sync_all(conn_factory, credential_provider, index: InvestorIndex) -> dict:
|
|
"""Run one pass across all enabled accounts. Returns summary stats."""
|
|
index.rebuild_if_stale(conn_factory)
|
|
|
|
conn = conn_factory()
|
|
try:
|
|
accounts = _db.list_sync_ready_accounts(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
totals = {"accounts": 0, "messages_stored": 0, "errors": 0}
|
|
for acc in accounts:
|
|
totals["accounts"] += 1
|
|
try:
|
|
stats = sync_account(conn_factory, credential_provider, acc, index)
|
|
totals["messages_stored"] += stats.get("messages_stored", 0)
|
|
except Exception:
|
|
totals["errors"] += 1
|
|
log.exception("sync failed for account %s", acc["email_address"])
|
|
return totals
|
|
|
|
|
|
def sync_account(conn_factory, credential_provider, account,
|
|
index: InvestorIndex) -> dict:
|
|
"""Sync a single mailbox. Returns stats dict."""
|
|
email_addr = account["email_address"]
|
|
stats = _gmail.CallStats()
|
|
client = _gmail.GmailClient(credential_provider, email_addr, stats=stats)
|
|
|
|
# Mark running
|
|
conn = conn_factory()
|
|
try:
|
|
run_id = _db.start_sync_run(conn,
|
|
account_id=account["id"],
|
|
kind="backfill" if not account["backfill_complete"] else "incremental")
|
|
_db.set_account_status(conn, account["id"], status="active", error=None)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
run_stats = {"messages_seen": 0, "messages_stored": 0, "attachments_saved": 0}
|
|
error_str: Optional[str] = None
|
|
status = "ok"
|
|
|
|
try:
|
|
if not account["backfill_complete"]:
|
|
_run_backfill(conn_factory, client, account, index, run_stats)
|
|
else:
|
|
_run_incremental(conn_factory, client, account, index, run_stats)
|
|
|
|
# Drain attachments for this account.
|
|
conn = conn_factory()
|
|
try:
|
|
# Limit to a few cycles' worth of attachments per pass.
|
|
batched = _attach.drain_pending(conn_factory, client, account["id"], limit=100)
|
|
run_stats["attachments_saved"] = batched
|
|
finally:
|
|
conn.close()
|
|
|
|
except _errors.AuthError as e:
|
|
error_str = f"auth: {e}"
|
|
status = "error"
|
|
except _errors.HistoryExpiredError:
|
|
# Recover: reset to date-based backfill from last_synced_at.
|
|
error_str = "history expired; fallback to date backfill"
|
|
status = "partial"
|
|
_fallback_date_backfill(conn_factory, client, account, index, run_stats)
|
|
except Exception as e:
|
|
error_str = f"unexpected: {type(e).__name__}: {e}"
|
|
status = "error"
|
|
log.exception("unexpected during sync of %s", email_addr)
|
|
finally:
|
|
run_stats["api_calls"] = stats.api_calls
|
|
run_stats["retries"] = stats.retries
|
|
conn = conn_factory()
|
|
try:
|
|
_db.finish_sync_run(conn, run_id, status=status, stats=run_stats, error=error_str)
|
|
_db.set_account_status(conn, account["id"],
|
|
status="active" if status == "ok" else status,
|
|
error=error_str)
|
|
_db.set_account_checkpoint(conn, account["id"],
|
|
last_synced_at=_db._now_iso())
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
return run_stats
|
|
|
|
|
|
# ---------------------------------------------------------------------------- backfill
|
|
|
|
def _run_backfill(conn_factory, client, account, index: InvestorIndex,
|
|
run_stats: dict) -> None:
|
|
"""Initial full-mailbox backfill, resumable via backfill_cursor."""
|
|
page_token = account["backfill_cursor"]
|
|
while True:
|
|
resp = client.list_messages(page_token=page_token,
|
|
max_results=_cfg.CONFIG.backfill_page_size)
|
|
messages = resp.get("messages") or []
|
|
for m in messages:
|
|
run_stats["messages_seen"] += 1
|
|
try:
|
|
_process_one_message(conn_factory, client, account, index,
|
|
gmail_message_id=m["id"], run_stats=run_stats)
|
|
except _errors.GmailError as e:
|
|
log.warning("skip msg %s on %s: %s", m["id"], account["email_address"], e)
|
|
continue
|
|
|
|
page_token = resp.get("nextPageToken")
|
|
conn = conn_factory()
|
|
try:
|
|
_db.set_account_checkpoint(conn, account["id"],
|
|
backfill_cursor=page_token,
|
|
backfill_complete=(not page_token))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
if not page_token:
|
|
# Capture current historyId as checkpoint for future incrementals.
|
|
prof = client.get_profile()
|
|
hid = prof.get("historyId")
|
|
if hid:
|
|
conn = conn_factory()
|
|
try:
|
|
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return
|
|
|
|
|
|
# ---------------------------------------------------------------------------- incremental
|
|
|
|
def _run_incremental(conn_factory, client, account, index: InvestorIndex,
|
|
run_stats: dict) -> None:
|
|
start_hid = account["last_history_id"]
|
|
if not start_hid:
|
|
# Safety: if checkpoint is missing, re-enter backfill.
|
|
_run_backfill(conn_factory, client, account, index, run_stats)
|
|
return
|
|
|
|
# history_types filter limits bandwidth to what we care about.
|
|
new_hid: Optional[str] = None
|
|
try:
|
|
for h in client.iter_history(
|
|
start_history_id=start_hid,
|
|
history_types=["messageAdded", "messageDeleted", "labelAdded", "labelRemoved"],
|
|
):
|
|
for ma in h.get("messagesAdded") or []:
|
|
msg = ma.get("message") or {}
|
|
run_stats["messages_seen"] += 1
|
|
try:
|
|
_process_one_message(conn_factory, client, account, index,
|
|
gmail_message_id=msg.get("id"),
|
|
run_stats=run_stats)
|
|
except _errors.GmailError as e:
|
|
log.warning("skip msg %s on %s: %s", msg.get("id"), account["email_address"], e)
|
|
|
|
for md in h.get("messagesDeleted") or []:
|
|
msg = md.get("message") or {}
|
|
conn = conn_factory()
|
|
try:
|
|
_db.tombstone_sighting(
|
|
conn,
|
|
account_id=account["id"],
|
|
gmail_message_id=msg.get("id"),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
for la in (h.get("labelsAdded") or []) + (h.get("labelsRemoved") or []):
|
|
msg = la.get("message") or {}
|
|
# labels are the resulting label set in Gmail's payload after
|
|
# the change. We refresh them wholesale.
|
|
labels = msg.get("labelIds") or []
|
|
conn = conn_factory()
|
|
try:
|
|
_db.update_sighting_labels(
|
|
conn,
|
|
account_id=account["id"],
|
|
gmail_message_id=msg.get("id"),
|
|
labels=labels,
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
new_hid = client.last_history_id
|
|
except _errors.HistoryExpiredError:
|
|
raise
|
|
|
|
if new_hid:
|
|
conn = conn_factory()
|
|
try:
|
|
_db.set_account_checkpoint(conn, account["id"], history_id=str(new_hid))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _fallback_date_backfill(conn_factory, client, account, index, run_stats):
|
|
"""Used when startHistoryId has been pruned by Gmail.
|
|
|
|
Pulls everything since last_synced_at (or 14d if unknown), which will
|
|
hit a large overlap with existing data but upserts are idempotent.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
since = account["last_synced_at"] or (
|
|
datetime.now(tz=timezone.utc) - timedelta(days=14)
|
|
).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
q = f"after:{since.replace('-', '/').split('T')[0]}"
|
|
for m in client.iter_messages(q=q):
|
|
run_stats["messages_seen"] += 1
|
|
try:
|
|
_process_one_message(conn_factory, client, account, index,
|
|
gmail_message_id=m["id"], run_stats=run_stats)
|
|
except _errors.GmailError as e:
|
|
log.warning("skip during date-backfill msg %s: %s", m["id"], e)
|
|
prof = client.get_profile()
|
|
hid = prof.get("historyId")
|
|
if hid:
|
|
conn = conn_factory()
|
|
try:
|
|
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------- per-message
|
|
|
|
def _process_one_message(conn_factory, client, account, index: InvestorIndex,
|
|
*, gmail_message_id: str, run_stats: dict) -> None:
|
|
"""Fetch, match, persist one message. Idempotent."""
|
|
if not gmail_message_id:
|
|
return
|
|
|
|
# Skip if we've already sighted this message for this account.
|
|
conn = conn_factory()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT email_id FROM email_account_messages "
|
|
"WHERE account_id = ? AND gmail_message_id = ?",
|
|
(account["id"], gmail_message_id),
|
|
)
|
|
if cur.fetchone():
|
|
return
|
|
finally:
|
|
conn.close()
|
|
|
|
# 1. Metadata fetch (cheap).
|
|
meta = client.get_message(gmail_message_id, format="metadata",
|
|
metadata_headers=METADATA_HEADERS)
|
|
meta_parsed = _parser.parse(meta, owning_account_address=account["email_address"])
|
|
|
|
participants = set()
|
|
if meta_parsed.get("from_email"):
|
|
participants.add(meta_parsed["from_email"])
|
|
for kind in ("to", "cc", "bcc"):
|
|
for a in meta_parsed.get(kind, []):
|
|
if isinstance(a, dict) and a.get("email"):
|
|
participants.add(a["email"])
|
|
|
|
# Exclude owning account's own address so we don't try to "match" ourselves.
|
|
own = {account["email_address"].lower()}
|
|
links = index.match(participants, exclude_addresses=own)
|
|
is_matched = bool(links)
|
|
|
|
# 2. If matched, fetch full and parse for body + attachments.
|
|
if is_matched:
|
|
full = client.get_message(gmail_message_id, format="full")
|
|
parsed = _parser.parse(full, owning_account_address=account["email_address"])
|
|
else:
|
|
parsed = meta_parsed
|
|
# Strip any body fields (metadata fetch shouldn't have them but be safe).
|
|
parsed["body_text"] = None
|
|
parsed["body_html"] = None
|
|
parsed["attachments"] = []
|
|
|
|
# 3. Persist (idempotent on rfc_message_id).
|
|
conn = conn_factory()
|
|
try:
|
|
existing = _db.find_email_by_rfc_id(conn, parsed["rfc_message_id"])
|
|
if existing:
|
|
email_id = existing["id"]
|
|
# If the email was previously unmatched but now matches (e.g. user
|
|
# added the investor after first sight), upgrade the row.
|
|
if is_matched and existing["match_status"] == "unmatched":
|
|
conn.execute(
|
|
"UPDATE emails SET match_status = 'matched', is_matched = 1, "
|
|
"body_text = ?, body_html = ?, updated_at = datetime('now') "
|
|
"WHERE id = ?",
|
|
(parsed.get("body_text"), parsed.get("body_html"), email_id),
|
|
)
|
|
_attach.register_stubs(conn,
|
|
email_id=email_id,
|
|
parsed_attachments=parsed.get("attachments") or [])
|
|
for link in links:
|
|
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
|
|
else:
|
|
match_status = "matched" if is_matched else "unmatched"
|
|
email_id = _db.insert_email(conn, parsed=parsed, match_status=match_status)
|
|
thread_id = _threads.resolve_thread_id(conn, parsed)
|
|
_db.set_email_thread(conn, email_id, thread_id)
|
|
if is_matched:
|
|
_attach.register_stubs(conn,
|
|
email_id=email_id,
|
|
parsed_attachments=parsed.get("attachments") or [])
|
|
for link in links:
|
|
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
|
|
_db.rollup_thread(conn, thread_id)
|
|
run_stats["messages_stored"] += 1
|
|
|
|
# Record sighting (always, even if email row was pre-existing).
|
|
_db.upsert_sighting(
|
|
conn,
|
|
email_id=email_id,
|
|
account_id=account["id"],
|
|
gmail_message_id=gmail_message_id,
|
|
gmail_thread_id=parsed.get("gmail_thread_id") or "",
|
|
labels=parsed.get("labels", []),
|
|
is_sent=parsed.get("is_sent", False),
|
|
)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
# Concurrent insert race — re-read and proceed.
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _flatten_link(link: InvestorLink) -> dict:
|
|
return {
|
|
"matched_address": link.matched_address,
|
|
"match_kind": link.match_kind,
|
|
"match_confidence": link.match_confidence,
|
|
"fundraising_investor_id": link.target.fundraising_investor_id,
|
|
"fundraising_contact_id": link.target.fundraising_contact_id,
|
|
"contact_id": link.target.contact_id,
|
|
"organization_id": link.target.organization_id,
|
|
}
|