Files
ten31-database/backend/email_integration/README.md
T
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

6.6 KiB

email_integration — Gmail capture for the Venture CRM

Scaffolded Phase 1 of the Gmail integration described in GMAIL_INTEGRATION_ARCHITECTURE.md (repo root). Everything in this module is isolated from server.py until you wire it in explicitly.

Contents

File Purpose
config.py Env-var loader; exposes CONFIG singleton.
errors.py Exception taxonomy used by the retry loop.
crypto.py AES-GCM wrapper for OAuth refresh-token encryption (only used in OAuth mode).
credentials.py CredentialProvider protocol + DWDCredentialProvider / OAuthCredentialProvider.
gmail_client.py Gmail API HTTP wrapper (rate limit, retry, pagination).
db.py All SQL touching emails_* tables. Migrations live under migrations/.
parser.py Gmail payload → canonical dict (headers, body, attachments).
matcher.py Investor address index + match logic.
threads.py Thread resolution using Gmail threadId + RFC References.
attachments.py Stub rows + on-disk storage + download worker.
sync.py Orchestrator for backfill + incremental sync of one account.
scheduler.py Background thread that runs sync.sync_all on an interval.
routes.py HTTP handlers under /api/email/* compatible with CRMHandler.
migrations/0001_email_tables.sql Table DDL.

Wiring it in

All changes are in backend/server.py, all guarded by an env flag. Each is independently revertible. None run unless CRM_GMAIL_INTEGRATION_ENABLED=true.

Patch 1 — migrations (append to init_db() after all existing cursor.executescript(...) calls, before conn.commit()):

try:
    from email_integration.db import apply_migrations
    apply_migrations(cursor)
except ImportError:
    pass

Patch 2 — scheduler (in main(), after start_backup_scheduler()):

if os.environ.get("CRM_GMAIL_INTEGRATION_ENABLED", "").lower() in ("1", "true", "yes", "on"):
    from email_integration.scheduler import start_sync_scheduler
    start_sync_scheduler()

Patch 3 — routes (add near the top of CRMHandler.do_GET and CRMHandler.do_POST, after auth/rate-limit pre-checks, before API routing):

try:
    from email_integration.routes import try_handle
    if try_handle(self):
        return
except ImportError:
    pass

Environment variables

# Master on/off. Default off; scheduler won't start, routes return 503.
CRM_GMAIL_INTEGRATION_ENABLED=true

# Auth method: "dwd" (default, recommended) or "oauth"
CRM_GMAIL_AUTH_METHOD=dwd

# DWD mode
CRM_GMAIL_SA_KEY_PATH=/path/to/CRM/data/secrets/gmail-service-account.json
CRM_GMAIL_WORKSPACE_DOMAIN=ten31.xyz

# OAuth mode (fallback; not required for DWD)
CRM_GMAIL_OAUTH_CLIENT_ID=...
CRM_GMAIL_OAUTH_CLIENT_SECRET=...
CRM_GMAIL_OAUTH_REDIRECT_URI=https://crm.ten31.xyz/api/email/oauth/callback
CRM_GMAIL_SECRET_KEY=<base64-32-random-bytes>   # for encrypting refresh tokens

# Sync
CRM_GMAIL_SYNC_INTERVAL_MIN=180          # default 3h
CRM_GMAIL_BACKFILL_PAGE_SIZE=500
CRM_GMAIL_MAX_ATTACHMENT_MB=50
CRM_GMAIL_ATTACH_CONCURRENCY=4
CRM_GMAIL_RATE_UNITS_SEC=150             # per account, leaves 40% headroom
CRM_GMAIL_RETRY_MAX=5
CRM_GMAIL_HISTORY_STALE_DAYS=5

Google Cloud / Workspace setup (DWD)

See GMAIL_INTEGRATION_ARCHITECTURE.md §3 for the full runbook. Short form:

  1. Create GCP project, enable Gmail API.
  2. Create service account, download JSON key, enable domain-wide delegation.
  3. In Google Admin console → Security → API controls → Manage domain-wide delegation, authorize the service account's client ID with scope https://www.googleapis.com/auth/gmail.readonly.
  4. Copy the JSON key to data/secrets/gmail-service-account.json, chmod 600.
  5. Set env vars in .env.beta, restart CRM.
  6. As admin, POST /api/email/accounts/enroll-all to create email_accounts rows for every active user whose email ends in the Workspace domain.

Adding the crypto dependency (only for OAuth mode)

If you use OAuth fallback you need cryptography:

cryptography==42.0.5

Append to backend/requirements.txt. DWD mode also uses cryptography for the RSA signing of the JWT bearer token — so if you enable the integration in either mode, add the dep.

Rollback

To disable instantly: set CRM_GMAIL_INTEGRATION_ENABLED=false and restart. The scheduler won't start, routes return 503, DB tables remain (unused).

To remove completely: drop the env var, delete data/email_attachments/, drop all emails_* tables and email_* tables (migration is idempotent create-only; a separate drop script would be required — not provided in Phase 1 since you said you're not rushing).

Local development

The module has zero network dependencies when imported without the scheduler starting. You can:

python3 -c "from email_integration.parser import parse; \
    import json; \
    print(parse(json.load(open('fixture.json'))))"

Testing checklist (before enabling in production)

  • Enable CRM_GMAIL_INTEGRATION_ENABLED=true on a staging copy of the DB only.
  • Verify migrations applied: emails, email_accounts, etc. present.
  • Enroll one account (yours) via /api/email/accounts/enroll.
  • Trigger POST /api/email/sync/run-now.
  • Check email_sync_runs for status='ok'.
  • Spot-check emails rows against Gmail.
  • Verify an attachment downloaded correctly (hash and size).
  • Let the scheduler run for 24 hours; monitor /api/email/status.
  • Enroll remaining 4 teammates.

What's scaffolded vs. TODO

Scaffolded and complete:

  • Schema (migration 0001)
  • Config and env parsing
  • Error taxonomy + retry classifier
  • AES-GCM crypto helpers
  • DWD JWT minting + access token caching
  • OAuth refresh + consent flow endpoints
  • Gmail client (list/get/history/attachments/profile) with rate limit + retry
  • Full DB data-access layer
  • MIME parser including RFC 2047 subjects and HTML→text fallback
  • Investor matcher with exact + domain strategies
  • Thread resolution (Gmail threadId + RFC References cross-account)
  • Attachment storage with SHA-256 dedup
  • Sync orchestrator (backfill + incremental with history-expired fallback)
  • Scheduler with manual-trigger hook
  • HTTP routes (status, accounts, threads, enroll, run-now, rematch, oauth)

TODO before production (see architecture doc §15):

  • Multipart batch metadata fetch in gmail_client.batch_get_metadata (currently serial fallback).
  • Unit tests (fixtures for parser, matcher, threads; integration tests with responses-style HTTP mock).
  • Frontend UI: a thread list + detail pane in frontend/index.html.
  • Sandboxed HTML rendering for email bodies (out of scope here).