Files
ten31-database/backend/email_integration/README.md
T
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

176 lines
6.6 KiB
Markdown

# `email_integration` — Gmail capture for the Venture CRM
Scaffolded Phase 1 of the Gmail integration described in
`GMAIL_INTEGRATION_ARCHITECTURE.md` (repo root). Everything in this module is
isolated from `server.py` until you wire it in explicitly.
## Contents
| File | Purpose |
|------|---------|
| `config.py` | Env-var loader; exposes `CONFIG` singleton. |
| `errors.py` | Exception taxonomy used by the retry loop. |
| `crypto.py` | AES-GCM wrapper for OAuth refresh-token encryption (only used in OAuth mode). |
| `credentials.py` | `CredentialProvider` protocol + `DWDCredentialProvider` / `OAuthCredentialProvider`. |
| `gmail_client.py` | Gmail API HTTP wrapper (rate limit, retry, pagination). |
| `db.py` | All SQL touching `emails_*` tables. Migrations live under `migrations/`. |
| `parser.py` | Gmail payload → canonical dict (headers, body, attachments). |
| `matcher.py` | Investor address index + match logic. |
| `threads.py` | Thread resolution using Gmail threadId + RFC References. |
| `attachments.py` | Stub rows + on-disk storage + download worker. |
| `sync.py` | Orchestrator for backfill + incremental sync of one account. |
| `scheduler.py` | Background thread that runs `sync.sync_all` on an interval. |
| `routes.py` | HTTP handlers under `/api/email/*` compatible with `CRMHandler`. |
| `migrations/0001_email_tables.sql` | Table DDL. |
## Wiring it in
All changes are in `backend/server.py`, all guarded by an env flag. Each is
independently revertible. None run unless `CRM_GMAIL_INTEGRATION_ENABLED=true`.
**Patch 1 — migrations** (append to `init_db()` after all existing
`cursor.executescript(...)` calls, before `conn.commit()`):
```python
try:
from email_integration.db import apply_migrations
apply_migrations(cursor)
except ImportError:
pass
```
**Patch 2 — scheduler** (in `main()`, after `start_backup_scheduler()`):
```python
if os.environ.get("CRM_GMAIL_INTEGRATION_ENABLED", "").lower() in ("1", "true", "yes", "on"):
from email_integration.scheduler import start_sync_scheduler
start_sync_scheduler()
```
**Patch 3 — routes** (add near the top of `CRMHandler.do_GET` and
`CRMHandler.do_POST`, after auth/rate-limit pre-checks, before API routing):
```python
try:
from email_integration.routes import try_handle
if try_handle(self):
return
except ImportError:
pass
```
## Environment variables
```bash
# Master on/off. Default off; scheduler won't start, routes return 503.
CRM_GMAIL_INTEGRATION_ENABLED=true
# Auth method: "dwd" (default, recommended) or "oauth"
CRM_GMAIL_AUTH_METHOD=dwd
# DWD mode
CRM_GMAIL_SA_KEY_PATH=/path/to/CRM/data/secrets/gmail-service-account.json
CRM_GMAIL_WORKSPACE_DOMAIN=ten31.xyz
# OAuth mode (fallback; not required for DWD)
CRM_GMAIL_OAUTH_CLIENT_ID=...
CRM_GMAIL_OAUTH_CLIENT_SECRET=...
CRM_GMAIL_OAUTH_REDIRECT_URI=https://crm.ten31.xyz/api/email/oauth/callback
CRM_GMAIL_SECRET_KEY=<base64-32-random-bytes> # for encrypting refresh tokens
# Sync
CRM_GMAIL_SYNC_INTERVAL_MIN=180 # default 3h
CRM_GMAIL_BACKFILL_PAGE_SIZE=500
CRM_GMAIL_MAX_ATTACHMENT_MB=50
CRM_GMAIL_ATTACH_CONCURRENCY=4
CRM_GMAIL_RATE_UNITS_SEC=150 # per account, leaves 40% headroom
CRM_GMAIL_RETRY_MAX=5
CRM_GMAIL_HISTORY_STALE_DAYS=5
```
## Google Cloud / Workspace setup (DWD)
See `GMAIL_INTEGRATION_ARCHITECTURE.md` §3 for the full runbook. Short form:
1. Create GCP project, enable Gmail API.
2. Create service account, download JSON key, enable domain-wide delegation.
3. In Google Admin console → Security → API controls → Manage domain-wide
delegation, authorize the service account's client ID with scope
`https://www.googleapis.com/auth/gmail.readonly`.
4. Copy the JSON key to `data/secrets/gmail-service-account.json`, `chmod 600`.
5. Set env vars in `.env.beta`, restart CRM.
6. As admin, POST `/api/email/accounts/enroll-all` to create `email_accounts`
rows for every active user whose email ends in the Workspace domain.
## Adding the crypto dependency (only for OAuth mode)
If you use OAuth fallback you need `cryptography`:
```
cryptography==42.0.5
```
Append to `backend/requirements.txt`. DWD mode also uses `cryptography` for
the RSA signing of the JWT bearer token — so if you enable the integration in
either mode, add the dep.
## Rollback
To disable instantly: set `CRM_GMAIL_INTEGRATION_ENABLED=false` and restart.
The scheduler won't start, routes return 503, DB tables remain (unused).
To remove completely: drop the env var, delete `data/email_attachments/`,
drop all `emails_*` tables and `email_*` tables (migration is idempotent
create-only; a separate drop script would be required — not provided in
Phase 1 since you said you're not rushing).
## Local development
The module has zero network dependencies when imported without the scheduler
starting. You can:
```python
python3 -c "from email_integration.parser import parse; \
import json; \
print(parse(json.load(open('fixture.json'))))"
```
## Testing checklist (before enabling in production)
- [ ] Enable `CRM_GMAIL_INTEGRATION_ENABLED=true` on a staging copy of the DB only.
- [ ] Verify migrations applied: `emails`, `email_accounts`, etc. present.
- [ ] Enroll one account (yours) via `/api/email/accounts/enroll`.
- [ ] Trigger `POST /api/email/sync/run-now`.
- [ ] Check `email_sync_runs` for `status='ok'`.
- [ ] Spot-check `emails` rows against Gmail.
- [ ] Verify an attachment downloaded correctly (hash and size).
- [ ] Let the scheduler run for 24 hours; monitor `/api/email/status`.
- [ ] Enroll remaining 4 teammates.
## What's scaffolded vs. TODO
**Scaffolded and complete:**
- Schema (migration 0001)
- Config and env parsing
- Error taxonomy + retry classifier
- AES-GCM crypto helpers
- DWD JWT minting + access token caching
- OAuth refresh + consent flow endpoints
- Gmail client (list/get/history/attachments/profile) with rate limit + retry
- Full DB data-access layer
- MIME parser including RFC 2047 subjects and HTML→text fallback
- Investor matcher with exact + domain strategies
- Thread resolution (Gmail threadId + RFC References cross-account)
- Attachment storage with SHA-256 dedup
- Sync orchestrator (backfill + incremental with history-expired fallback)
- Scheduler with manual-trigger hook
- HTTP routes (status, accounts, threads, enroll, run-now, rematch, oauth)
**TODO before production (see architecture doc §15):**
- Multipart batch metadata fetch in `gmail_client.batch_get_metadata`
(currently serial fallback).
- Unit tests (fixtures for parser, matcher, threads; integration tests with
responses-style HTTP mock).
- Frontend UI: a thread list + detail pane in `frontend/index.html`.
- Sandboxed HTML rendering for email bodies (out of scope here).