Files
ten31-database/GMAIL_INTEGRATION_ARCHITECTURE.md
T
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

41 KiB
Raw Blame History

Gmail Integration — Technical Architecture

Status: Design / Phase 1 scaffolding Target: Venture CRM (self-hosted, StartOS 0.4 beta) Author: Grant + Claude Last updated: 2026-04-21


1. Goals and non-goals

In scope for Phase 1

  1. Authorize Gmail access for all 5 team members.
  2. Continuous, incremental sync of sent and received mail.
  3. Parse sender, recipients, subject, body, timestamp, attachments.
  4. Store locally in the CRM SQLite DB.
  5. Match each email to an investor record by sender/recipient email.
  6. Thread related messages using Gmail's native threadId plus RFC 2822 Message-ID / In-Reply-To / References.
  7. Store attachments on local disk alongside email records.

Non-goals for Phase 1 (future phases)

  • Sending email from the CRM.
  • Full-text search (we'll store bodies; search UI comes later).
  • Real-time push via Pub/Sub watch (we'll poll every few hours).
  • Contact auto-creation from unknown senders.
  • Shared team inbox features.

Guiding constraints

  • Do not break the running CRM. All new code lives in an isolated module. Exactly two small additions are made to the existing server.py: a startup call and a route-dispatch hook (both behind a feature flag, both safe to roll back).
  • Match the existing codebase's style. Pure Python stdlib + sqlite3, no new hard dependencies beyond cryptography (for encrypting stored OAuth refresh tokens) and the google-auth / google-api-python-client libs for the Gmail SDK. Everything else is stdlib.
  • Single-process, single-sqlite-writer discipline. The sync worker writes through the same connection pattern as the rest of the CRM (WAL mode, short transactions) so we don't introduce lock contention.

2. Decision summary

# Decision Choice Rationale
D1 Auth method Domain-wide delegation (DWD) primary, per-user OAuth2 as pluggable fallback You're Workspace super-admin; one-time setup, no per-user consent flow, centralized revocation. OAuth path preserved behind a common CredentialProvider interface so the CRM isn't locked in.
D2 Schema New dedicated emails_* tables (not reuse communications) Gmail-specific fields (gmail_message_id, thread_id, history_id, labels, RFC headers) don't belong in the generic communications table. communications can later project email rows via a read-only view if desired.
D3 Sync scope Entire mailbox, match-only full storage Fetch cheap metadata for every message. Persist the full body + attachments only for messages that match a known investor email. Minimizes disk use and privacy surface.
D4 Incremental strategy Gmail history.list driven by per-account historyId checkpoint Canonical Gmail-supported mechanism for "what changed since last sync" — cheap, reliable, dedupe-free. messages.list only on initial backfill.
D5 Delivery Architecture doc + scaffolded modules in backend/email_integration/, no edits to server.py yet Lets you review the shape before any wiring happens. Activation is a 3-line patch to server.py when you're ready.

3. Google Cloud / Workspace setup (one-time, ~20 minutes)

This is the runbook you'll follow once the code is ready. It's in the doc so the setup steps and the code that relies on them are never out of sync.

3a. Create a GCP project

  1. console.cloud.google.com → new project, name it ten31-crm-gmail (or similar). Note the Project ID.
  2. Enable the Gmail API: APIs & Services → Library → "Gmail API" → Enable.

3b. Create a service account (for DWD)

  1. IAM & Admin → Service Accounts → Create service account.
    • Name: crm-gmail-sync
    • Role: leave blank (it doesn't need any GCP IAM roles; Gmail scope comes from DWD).
  2. On the service account's detail page → Keys → Add Key → Create new key → JSON. Download the file.
  3. Advanced settings → enable domain-wide delegation. Copy the numeric client ID (a ~21-digit number).

3c. Authorize the service account in Google Workspace

  1. admin.google.com → Security → Access and data control → API controls → Manage domain-wide delegation → Add new.
  2. Client ID: paste the numeric client ID from step 5.
  3. OAuth scopes (comma-separated, read-only — no send):
    https://www.googleapis.com/auth/gmail.readonly,
    https://www.googleapis.com/auth/gmail.metadata
    
    (Phase 1 uses gmail.readonly only. gmail.metadata is listed for a possible future "metadata-only" sync mode.)
  4. Authorize.

3d. Install the service account key on the CRM server

  1. Copy the JSON key file to <CRM_ROOT>/data/secrets/gmail-service-account.json.
  2. chmod 600 the file. chown to the CRM process owner.
  3. Set env var in .env.beta:
    CRM_GMAIL_SA_KEY_PATH=/path/to/CRM/data/secrets/gmail-service-account.json
    CRM_GMAIL_WORKSPACE_DOMAIN=ten31.xyz
    CRM_GMAIL_INTEGRATION_ENABLED=true
    CRM_GMAIL_SYNC_INTERVAL_MIN=180   # 3 hours
    CRM_GMAIL_SECRET_KEY=<32+ random bytes base64> # for encrypting stored OAuth refresh tokens (unused in DWD mode but required if fallback ever activates)
    

3e. Configure per-user mailboxes

  1. In the CRM, Admin → Email Accounts → "Enroll all Workspace users via DWD". This calls POST /api/email/accounts/enroll-all which for each existing CRM user with an @ten31.xyz email creates an email_accounts row bound to their address.
  2. Each teammate sees a "Gmail sync: active" chip on their profile.

4. Database schema

All new tables live under the emails_ prefix so they're grouped in tooling. Migration script: backend/email_integration/migrations/0001_email_tables.sql. It is idempotent (uses CREATE TABLE IF NOT EXISTS + targeted ALTER TABLE guards) so re-running is safe.

4a. email_accounts — one row per enrolled mailbox

CREATE TABLE IF NOT EXISTS email_accounts (
    id                TEXT PRIMARY KEY,              -- uuid
    user_id           TEXT NOT NULL,                 -- FK users.id
    email_address     TEXT NOT NULL UNIQUE,          -- e.g. grant@ten31.xyz
    auth_method       TEXT NOT NULL,                 -- 'dwd' | 'oauth'
    oauth_refresh_enc BLOB,                          -- NULL for DWD; AES-GCM ciphertext for oauth
    oauth_token_exp   TEXT,                          -- ISO8601, cached access token expiry
    oauth_token_enc   BLOB,                          -- short-lived access token cache (encrypted)
    sync_enabled      INTEGER NOT NULL DEFAULT 1,
    sync_status       TEXT NOT NULL DEFAULT 'pending', -- pending|active|paused|error
    sync_error        TEXT,                          -- last error message, if any
    last_history_id   TEXT,                          -- Gmail historyId checkpoint
    last_synced_at    TEXT,
    backfill_complete INTEGER NOT NULL DEFAULT 0,    -- 0 = initial backfill still running
    backfill_cursor   TEXT,                          -- nextPageToken during backfill
    created_at        TEXT DEFAULT (datetime('now')),
    updated_at        TEXT DEFAULT (datetime('now')),
    FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_email_accounts_user ON email_accounts(user_id);
CREATE INDEX IF NOT EXISTS idx_email_accounts_sync ON email_accounts(sync_enabled, sync_status);

4b. emails — canonical email record (dedup'd across accounts)

One row per distinct RFC Message-ID. If two teammates are both on a thread, we store one emails row and link both via email_account_messages.

CREATE TABLE IF NOT EXISTS emails (
    id                 TEXT PRIMARY KEY,              -- uuid
    rfc_message_id     TEXT NOT NULL UNIQUE,          -- RFC 2822 Message-ID, dedup key
    gmail_thread_id    TEXT,                          -- primary Gmail thread id (first account that saw it)
    rfc_thread_root_id TEXT,                          -- root of References chain (for cross-account threading)
    subject            TEXT,
    from_email         TEXT NOT NULL,
    from_name          TEXT,
    to_emails_json     TEXT NOT NULL DEFAULT '[]',    -- JSON array
    cc_emails_json     TEXT NOT NULL DEFAULT '[]',
    bcc_emails_json    TEXT NOT NULL DEFAULT '[]',
    reply_to           TEXT,
    sent_at            TEXT NOT NULL,                 -- from Date: header, ISO8601
    body_text          TEXT,                          -- plain/text part
    body_html          TEXT,                          -- text/html part
    snippet            TEXT,                          -- Gmail-provided, useful for unmatched rows
    in_reply_to        TEXT,                          -- RFC Message-ID of parent
    references_json    TEXT DEFAULT '[]',             -- parsed References: header
    has_attachments    INTEGER NOT NULL DEFAULT 0,
    size_estimate      INTEGER,                       -- bytes reported by Gmail
    is_matched         INTEGER NOT NULL DEFAULT 0,    -- 1 if any investor link exists
    match_status       TEXT NOT NULL DEFAULT 'unmatched', -- unmatched|matched|skipped
    raw_headers_json   TEXT,                          -- full header dump for debugging/forensics
    created_at         TEXT DEFAULT (datetime('now')),
    updated_at         TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_emails_thread        ON emails(gmail_thread_id);
CREATE INDEX IF NOT EXISTS idx_emails_rfc_thread    ON emails(rfc_thread_root_id);
CREATE INDEX IF NOT EXISTS idx_emails_from          ON emails(from_email);
CREATE INDEX IF NOT EXISTS idx_emails_sent_at       ON emails(sent_at);
CREATE INDEX IF NOT EXISTS idx_emails_matched       ON emails(is_matched, sent_at);

Match-only storage note: For unmatched emails we still write the emails row, but body_text / body_html / raw_headers_json are left NULL (only headers and snippet are persisted). A nightly job can prune unmatched rows older than 90 days if storage becomes a concern. See §12.

4c. email_recipients — exploded recipients for fast investor matching

Denormalized so we can index and JOIN directly.

CREATE TABLE IF NOT EXISTS email_recipients (
    id           TEXT PRIMARY KEY,
    email_id     TEXT NOT NULL,
    address      TEXT NOT NULL,                -- lowercased, trimmed
    display_name TEXT,
    kind         TEXT NOT NULL,                -- 'from'|'to'|'cc'|'bcc'|'reply_to'
    FOREIGN KEY(email_id) REFERENCES emails(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_email_recipients_addr ON email_recipients(address);
CREATE INDEX IF NOT EXISTS idx_email_recipients_email ON email_recipients(email_id);

4d. email_account_messages — per-account sighting of an email

Same emails row may be visible to multiple teammates with different Gmail message IDs (each user's mailbox has its own copy). This join table records each sighting.

CREATE TABLE IF NOT EXISTS email_account_messages (
    id                TEXT PRIMARY KEY,
    email_id          TEXT NOT NULL,
    account_id        TEXT NOT NULL,
    gmail_message_id  TEXT NOT NULL,                  -- per-mailbox unique
    gmail_thread_id   TEXT NOT NULL,
    labels_json       TEXT DEFAULT '[]',              -- Gmail labels from this account's view
    is_sent           INTEGER NOT NULL DEFAULT 0,     -- 1 if SENT label present
    first_seen_at     TEXT DEFAULT (datetime('now')),
    FOREIGN KEY(email_id) REFERENCES emails(id) ON DELETE CASCADE,
    FOREIGN KEY(account_id) REFERENCES email_accounts(id) ON DELETE CASCADE,
    UNIQUE(account_id, gmail_message_id)
);
CREATE INDEX IF NOT EXISTS idx_eam_email      ON email_account_messages(email_id);
CREATE INDEX IF NOT EXISTS idx_eam_account    ON email_account_messages(account_id);
CREATE INDEX IF NOT EXISTS idx_eam_gmail_msg  ON email_account_messages(gmail_message_id);

4e. email_attachments — attachments stored on disk

Attachment bytes live at data/email_attachments/<email_id>/<attachment_id>-<sanitized_filename>. DB stores metadata + content hash.

CREATE TABLE IF NOT EXISTS email_attachments (
    id                    TEXT PRIMARY KEY,
    email_id              TEXT NOT NULL,
    gmail_attachment_id   TEXT NOT NULL,         -- opaque Gmail handle
    filename              TEXT NOT NULL,
    sanitized_filename    TEXT NOT NULL,         -- what's actually on disk
    mime_type             TEXT,
    size_bytes            INTEGER,
    sha256_hex            TEXT,                  -- integrity + dedup across emails
    storage_path          TEXT NOT NULL,         -- relative to CRM_DATA_DIR
    download_status       TEXT NOT NULL DEFAULT 'pending', -- pending|downloaded|failed|skipped
    download_attempts     INTEGER NOT NULL DEFAULT 0,
    download_error        TEXT,
    downloaded_at         TEXT,
    created_at            TEXT DEFAULT (datetime('now')),
    FOREIGN KEY(email_id) REFERENCES emails(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_attach_email ON email_attachments(email_id);
CREATE INDEX IF NOT EXISTS idx_attach_sha   ON email_attachments(sha256_hex);

4f. email_threads — thread roll-up (materialized for UI)

CREATE TABLE IF NOT EXISTS email_threads (
    id                 TEXT PRIMARY KEY,          -- uuid
    gmail_thread_id    TEXT,                      -- NULL if we only have RFC threading
    rfc_thread_root_id TEXT,                      -- fallback / cross-account key
    subject_normalized TEXT,                      -- lowercased + stripped of Re:/Fwd: prefixes
    first_message_at   TEXT,
    last_message_at    TEXT,
    message_count      INTEGER NOT NULL DEFAULT 0,
    participant_count  INTEGER NOT NULL DEFAULT 0,
    participants_json  TEXT DEFAULT '[]',
    is_matched         INTEGER NOT NULL DEFAULT 0,
    created_at         TEXT DEFAULT (datetime('now')),
    updated_at         TEXT DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_threads_gmail ON email_threads(gmail_thread_id) WHERE gmail_thread_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_threads_rfc_root ON email_threads(rfc_thread_root_id);
CREATE INDEX IF NOT EXISTS idx_threads_last_msg ON email_threads(last_message_at);

-- Back-link emails → threads
ALTER TABLE emails ADD COLUMN thread_id TEXT;  -- wrapped in PRAGMA-based guard at migration time
CREATE INDEX IF NOT EXISTS idx_emails_thread_fk ON emails(thread_id);
CREATE TABLE IF NOT EXISTS email_investor_links (
    id                      TEXT PRIMARY KEY,
    email_id                TEXT NOT NULL,
    -- Match target: exactly one of the following is non-null
    fundraising_investor_id TEXT,                  -- fundraising_investors.id
    fundraising_contact_id  TEXT,                  -- fundraising_contacts.id
    contact_id              TEXT,                  -- contacts.id
    organization_id         TEXT,                  -- organizations.id (if matched by domain only)
    matched_address         TEXT NOT NULL,         -- which email address produced the match
    match_kind              TEXT NOT NULL,         -- 'exact_email'|'domain_match'|'manual'
    match_confidence        REAL NOT NULL DEFAULT 1.0, -- 0..1
    created_at              TEXT DEFAULT (datetime('now')),
    FOREIGN KEY(email_id) REFERENCES emails(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_eil_email        ON email_investor_links(email_id);
CREATE INDEX IF NOT EXISTS idx_eil_investor     ON email_investor_links(fundraising_investor_id);
CREATE INDEX IF NOT EXISTS idx_eil_fr_contact   ON email_investor_links(fundraising_contact_id);
CREATE INDEX IF NOT EXISTS idx_eil_contact      ON email_investor_links(contact_id);

4h. email_sync_runs — observability

CREATE TABLE IF NOT EXISTS email_sync_runs (
    id                TEXT PRIMARY KEY,
    account_id        TEXT NOT NULL,
    kind              TEXT NOT NULL,              -- 'backfill'|'incremental'
    started_at        TEXT NOT NULL,
    finished_at       TEXT,
    status            TEXT NOT NULL,              -- 'running'|'ok'|'error'|'partial'
    messages_seen     INTEGER NOT NULL DEFAULT 0,
    messages_stored   INTEGER NOT NULL DEFAULT 0,
    attachments_saved INTEGER NOT NULL DEFAULT 0,
    api_calls         INTEGER NOT NULL DEFAULT 0,
    retries           INTEGER NOT NULL DEFAULT 0,
    error             TEXT,
    FOREIGN KEY(account_id) REFERENCES email_accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_sync_runs_account ON email_sync_runs(account_id, started_at);

5. Module layout

backend/
├── server.py                           # unchanged in Phase 1 (hooks added at end of Phase 1)
└── email_integration/
    ├── __init__.py                     # re-exports public API
    ├── README.md                       # setup instructions (copy of §3 in code-local form)
    ├── config.py                       # env-var loader + singleton Config()
    ├── credentials.py                  # CredentialProvider interface + DWD/OAuth impls
    ├── db.py                           # thin data-access layer; all SQL lives here
    ├── gmail_client.py                 # Gmail API wrapper w/ retry + rate-limit
    ├── parser.py                       # MIME/payload parser
    ├── matcher.py                      # investor matching logic
    ├── threads.py                      # thread roll-up logic
    ├── attachments.py                  # attachment download + disk storage
    ├── sync.py                         # orchestrator (backfill + incremental)
    ├── scheduler.py                    # background thread running periodic sync
    ├── routes.py                       # HTTP handlers (plug into CRMHandler)
    ├── errors.py                       # exception types + classification
    ├── crypto.py                       # AES-GCM wrapper for encrypting OAuth tokens
    └── migrations/
        └── 0001_email_tables.sql

6. Auth flow

6a. Credential abstraction

class CredentialProvider(Protocol):
    def access_token_for(self, email_address: str) -> AccessToken: ...
    # AccessToken = namedtuple("AccessToken", ["token", "expires_at"])

class DWDCredentialProvider:
    """Service-account impersonation. No per-user state."""

class OAuthCredentialProvider:
    """Per-user refresh tokens stored encrypted in email_accounts.oauth_refresh_enc."""

sync.py and gmail_client.py only depend on CredentialProvider. Switching modes is a config flag.

6b. DWD flow (primary)

  1. On startup: load service-account JSON from CRM_GMAIL_SA_KEY_PATH.
  2. For each sync request, build a signed JWT claim set:
    iss: <service account email>
    sub: <target user email>          ← impersonation
    scope: gmail.readonly
    aud: https://oauth2.googleapis.com/token
    exp: now + 3600
    iat: now
    
  3. Sign RS256 with the service account's private key.
  4. POST to https://oauth2.googleapis.com/token with grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<jwt>.
  5. Cache the returned access token (typ. 1h lifetime) in email_accounts.oauth_token_enc + oauth_token_exp. If expired/missing, re-mint.

We implement this ourselves with cryptography rather than pulling google-auth as a hard dependency — keeps the vendor footprint small. If you want to use google-auth instead, swap _mint_dwd_access_token() for google.oauth2.service_account.Credentials.with_subject(...) — one-line change.

6c. OAuth fallback flow

Endpoints scaffolded but inactive by default:

  • GET /api/email/oauth/start?account_email=<email> → redirects to Google consent screen with state CSRF token, access_type=offline, prompt=consent.
  • GET /api/email/oauth/callback?code=...&state=... → exchanges code for tokens, encrypts refresh token with AES-GCM using CRM_GMAIL_SECRET_KEY, stores in email_accounts.oauth_refresh_enc.
  • Access tokens are refreshed on-demand just before any Gmail call if oauth_token_exp is within 60s of expiring.

Refresh tokens never leave the DB in plaintext; decryption happens in memory for the duration of a single API call and is then discarded.

6d. Token storage security

  • At rest: oauth_refresh_enc is AES-256-GCM with a random per-record nonce. Key material is CRM_GMAIL_SECRET_KEY (separate from the CRM's JWT CRM_SECRET_KEY so you can rotate them independently).
  • At rest (DWD): no tokens to store. Only the service account JSON is sensitive, and it lives at 0600 on the host.
  • In transit: all Gmail API calls use HTTPS. Our HTTP client pins TLS 1.2+.
  • Revocation:
    • DWD: remove client ID from Workspace Admin console → instantly revokes all 5 accounts.
    • OAuth: per-user DELETE /api/email/accounts/:id/revoke calls https://oauth2.googleapis.com/revoke then zeroes the DB row.

7. Sync algorithm

7a. Initial backfill (runs once per account)

When a new email_accounts row is created:

while backfill_complete == 0:
    resp = GET /gmail/v1/users/me/messages
           ?pageToken=<backfill_cursor>&maxResults=500&q=
    for msg_id in resp.messages:
        fetch metadata (format=metadata, headers=[From,To,Cc,Subject,Date,
                                                  Message-ID,In-Reply-To,References])
        decide match via matcher.is_match(addresses)
        if matched:
            fetch full (format=full)
            parser.extract(...)
            db.upsert_email(...), db.upsert_links(...)
            enqueue attachment downloads
        else:
            db.upsert_email(headers_only=True)
        db.upsert_account_sighting(account_id, msg_id, thread_id, labels)
    backfill_cursor = resp.nextPageToken
    if not backfill_cursor:
        backfill_complete = 1
        last_history_id = GET /users/me/profile → historyId

Batch the metadata fetches using Gmail's batch endpoint (up to 100 per batch) to stay well under quota.

7b. Incremental sync (runs every CRM_GMAIL_SYNC_INTERVAL_MIN minutes)

for account where sync_enabled AND backfill_complete:
    try:
        resp = GET /users/me/history?startHistoryId=<last_history_id>
        for record in resp.history:
            for msg_added in record.messagesAdded:
                process_message(msg_added.message.id)   # same as backfill
            for label_change in record.labelsAdded/labelsRemoved:
                update labels on email_account_messages (cheap)
            for msg_deleted in record.messagesDeleted:
                # Gmail delete ≠ CRM delete. We tombstone the sighting, keep email.
                mark email_account_messages.deleted_at
        last_history_id = resp.historyId
    except HistoryExpiredError (HTTP 404):
        # Gmail only keeps history for ~7 days. If we fall behind, trigger
        # a partial re-backfill using q=after:<last_synced_at>.
        reset_to_date_backfill(account)

Idempotency guarantee: email_account_messages (account_id, gmail_message_id) is unique. emails.rfc_message_id is unique. A retry can't double-insert. The INSERT OR IGNORE patterns in db.py make this explicit.

7c. Scheduler

A single daemon thread started from server.py startup:

# scheduler.py
def run_forever():
    while not _stop.is_set():
        try:
            run_all_accounts()
        except Exception as e:
            log.exception("sync loop failed")
        _stop.wait(CONFIG.sync_interval_sec)

Within run_all_accounts(), accounts are processed sequentially (not in parallel) because:

  • Our sqlite setup has one writer.
  • It keeps the per-user-quota-per-second math simple (250 units/user/sec per account, no cross-account contention).
  • Serial processing at ~30s/account × 5 = 2.5 min per cycle, which is well within a 3-hour interval.

Manual trigger: POST /api/email/sync/run-now (admin-only) skips the sleep and runs one cycle immediately.


8. Parsing

Gmail returns messages as a recursive tree of MIME parts. parser.py walks the tree and produces a ParsedEmail dataclass.

Headers we extract:

  • From (single address, parsed with email.utils.getaddresses)
  • To, Cc, Bcc, Reply-To (address lists)
  • Subject
  • Date (parsed with email.utils.parsedate_to_datetime, stored UTC ISO8601)
  • Message-ID
  • In-Reply-To
  • References (space-separated → JSON array)

Body extraction rules:

  1. Prefer text/plain part; fall back to HTML-stripped text from text/html using stdlib html.parser.HTMLParser.
  2. Keep text/html alongside text/plain when both exist (both columns stored).
  3. Ignore inline images for body purposes; they get stored as attachments with content_disposition='inline'.
  4. Decode base64url bodies correctly (Gmail uses -_ alphabet with no padding).
  5. Normalize line endings to \n.

Attachments are identified by any MIME part with a non-empty filename or Content-Disposition: attachment. Attachment bodies are not inlined in the message get response when it's large — Gmail returns an opaque attachmentId, and we fetch them separately via messages.attachments.get.

Edge cases explicitly handled:

  • Malformed Date: headers (fall back to internalDate from Gmail response, always present).
  • Missing Message-ID header (generate synthetic: synthetic-<gmail_message_id>@ten31.local).
  • Duplicate Message-ID across mailboxes (expected — primary dedup path).
  • RFC 2047 encoded subjects (=?UTF-8?B?...?=) — email.header.decode_header.

9. Matching

9a. Source of truth for investor addresses

At matcher startup and periodically (every 15 min), we build an in-memory index:

INVESTOR_EMAIL_INDEX = {
    "alice@acme.vc": {
        "kind": "fundraising_contact",
        "id": "fc_abc",
        "investor_id": "fi_xyz",
        "investor_name": "Acme Capital"
    },
    ...
}

INVESTOR_DOMAIN_INDEX = {
    "acme.vc": [("fi_xyz", "Acme Capital"), ...]
}

Pulled from:

  1. fundraising_contacts.email (primary LP list)
  2. contacts.email (general CRM contacts)
  3. organizations.email and domain of organizations.website (weak domain match only)

9b. Match algorithm

For each parsed email, collect the set of participant addresses (from + to + cc + bcc), excluding the owning account's own address (we don't match our own team).

def match(participants: set[str]) -> list[InvestorLink]:
    links = []
    for addr in participants:
        if addr in INVESTOR_EMAIL_INDEX:
            entry = INVESTOR_EMAIL_INDEX[addr]
            links.append(InvestorLink(
                match_kind="exact_email",
                confidence=1.0,
                matched_address=addr,
                **entry.targets
            ))
    if not links:  # only fall back to domain if no exact match
        for addr in participants:
            domain = addr.split("@", 1)[-1].lower()
            if domain in INVESTOR_DOMAIN_INDEX and domain not in COMMON_PERSONAL_DOMAINS:
                for inv_id, inv_name in INVESTOR_DOMAIN_INDEX[domain]:
                    links.append(InvestorLink(
                        match_kind="domain_match",
                        confidence=0.6,
                        matched_address=addr,
                        fundraising_investor_id=inv_id
                    ))
    return dedupe(links)

COMMON_PERSONAL_DOMAINS = {"gmail.com", "outlook.com", "yahoo.com", "icloud.com", ...} — we don't domain-match on these.

Match status flows:

  • 0 links → emails.match_status = 'unmatched', body not stored.
  • ≥1 exact_email link → match_status = 'matched', full body + attachments stored.
  • Only domain_match links → match_status = 'matched' with confidence 0.6, full body stored but flagged for review in UI.

9c. Re-matching

If a new investor contact is added to the CRM, existing unmatched emails can be retroactively matched. POST /api/email/rematch?since=<iso> rebuilds the index and scans emails where match_status='unmatched' in the window. Full bodies are fetched from Gmail on demand for the newly matched ones (we still have the gmail_message_id via the sightings table).


10. Threading

Two-level strategy:

  1. Primary: Gmail threadId. Within one mailbox, Gmail's grouping is authoritative. We trust it.
  2. Cross-account reconciliation: RFC References / In-Reply-To. If Alice sees thread thr_A and Bob sees thread thr_B and they share RFC Message-IDs, they're really one conversation.

Algorithm:

def resolve_thread(email_row):
    # Step 1: normalize subject (strip "Re:", "Fwd:", whitespace)
    subj_norm = normalize_subject(email_row.subject)

    # Step 2: collect candidate thread keys
    rfc_refs = email_row.references + [email_row.in_reply_to]
    rfc_refs = [r for r in rfc_refs if r]

    # Step 3: find any existing email that shares a References link
    parent = db.find_email_by_any_rfc_id(rfc_refs + [email_row.rfc_message_id])

    if parent and parent.thread_id:
        return parent.thread_id

    # Step 4: fall back to gmail_thread_id (first sighting wins)
    if email_row.gmail_thread_id:
        existing = db.find_thread_by_gmail_id(email_row.gmail_thread_id)
        if existing:
            return existing.id

    # Step 5: create new thread
    return db.create_thread(
        gmail_thread_id=email_row.gmail_thread_id,
        rfc_thread_root_id=rfc_refs[0] if rfc_refs else email_row.rfc_message_id,
        subject_normalized=subj_norm,
        first_message_at=email_row.sent_at
    )

Thread roll-up (email_threads.message_count, participants_json, last_message_at) is recomputed opportunistically on each insert using a single UPDATE ... FROM (SELECT ...) — cheap at the volumes a 5-person team produces.


11. Attachments

11a. Storage layout

data/
├── crm.db
├── email_attachments/
│   └── <email_id[0:2]>/               # shard by first 2 chars of email_id to keep dirs small
│       └── <email_id>/
│           ├── <attachment_id>-pitch-deck.pdf
│           └── <attachment_id>-financials.xlsx

11b. Download flow

Run asynchronously from message parsing (so a giant attachment doesn't block the sync loop):

  1. Parser records attachment stubs in email_attachments with download_status='pending'.
  2. An attachment-download worker pulls up to N at a time (default 4), calls messages.attachments.get, streams to a temp file, computes SHA-256, renames to final path.
  3. On success: download_status='downloaded', downloaded_at, sha256_hex, size_bytes filled.
  4. On failure: download_status='failed', download_attempts incremented, download_error set. Retry with backoff up to 5 attempts.
  5. Sanitize filenames aggressively: strip path separators, null bytes, control chars, truncate to 200 chars. Keep extension.

11c. Dedup and size limits

  • Content-addressed by sha256_hex. If the same file already exists on disk (same hash), we don't re-download — we link the second email_attachments row to the same storage_path.
  • Default per-file cap: 50 MB. Above that, we record metadata only with download_status='skipped' and a note. Configurable via CRM_GMAIL_MAX_ATTACHMENT_MB.
  • Default total storage cap: 10 GB (logged warning at 80%).

11d. Retrieval

  • GET /api/email/attachments/:id/download — authenticated, streams the file. 404 if status != downloaded.
  • No direct filesystem exposure.

12. Errors, rate limits, backoff

12a. Error taxonomy (errors.py)

class GmailError(Exception): ...
class AuthError(GmailError): ...              # 401/403 — broken creds, needs operator attention
class RateLimitError(GmailError): ...         # 429 or 403 rate_limit — backoff + retry
class TransientError(GmailError): ...         # 500, 502, 503, 504, network — backoff + retry
class NotFoundError(GmailError): ...          # 404 — message deleted, history expired
class HistoryExpiredError(NotFoundError): ... # specifically for expired startHistoryId
class PermanentError(GmailError): ...         # 400 bad request etc — don't retry, log+skip

Every HTTP call is routed through gmail_client._call() which maps status codes → exception types → retry decision.

12b. Retry policy

Exponential backoff with full jitter:

  • Initial delay: 1s
  • Max delay: 60s
  • Max attempts: 5 for transient/rate-limit, 0 for auth/permanent
  • On the 6th rate-limit error in a row for an account, mark account sync_status='paused' and alert via audit log.

12c. Gmail quota budget

  • Per-project: 1,000,000,000 units/day (enormous; we won't get near it).
  • Per-user: 250 units/second. We pace to ~150 units/sec/account via a token bucket in gmail_client.py. With 5 accounts processed serially, that's naturally below the per-project burst limit of 1,200 units/sec.

Call costs:

  • messages.list = 5
  • messages.get = 5
  • messages.attachments.get = 5
  • history.list = 2

Typical incremental cycle for one active user (~50 new messages in 3h): ~260 units. Negligible.

12d. Data validation

All DB inserts go through db.py helpers that:

  • Enforce required fields (from_email, sent_at, rfc_message_id).
  • Cap TEXT field lengths (subject 998 bytes per RFC 5322, body_text 10 MB soft cap with truncation log).
  • Coerce timestamps to UTC ISO8601.
  • Wrap everything in a single transaction per email.

13. Security considerations

Concern Mitigation
Service account key theft File at 0600, not in git, .gitignore covers data/secrets/, key usable only for Gmail scope, restricted to Workspace domain by DWD.
SQL injection All SQL uses parameterized queries; strict ? placeholders.
Path traversal in attachment filenames Filenames are sanitized (_sanitize_filename) and written to a hash-based path; original name is stored in DB only.
Rate-limit DoS on sync endpoint POST /api/email/sync/run-now is admin-only and subject to existing WRITE_RATE_LIMIT_PER_MIN.
Exfiltration via rendered email HTML UI must render in a sandboxed iframe with CSP; no remote image/script loads. (Frontend change, not in scope for this scaffolding.)
Secret exposure in logs Access tokens and refresh tokens never logged. Error messages are redacted by a central redact_secrets() filter.
Backup includes email bodies Existing backup system writes JSON of fundraising_state only — it does not include the new emails_* tables. Future decision: include encrypted? See §17.

14. Integration plan — zero-breakage activation

The scaffolded code is inert until three very small changes are made to server.py. Each is individually reversible.

Patch 1: Table creation (append to init_db())

# server.py, near the end of init_db()
try:
    from email_integration.db import apply_migrations
    apply_migrations(cursor)
except ImportError:
    pass  # module not deployed yet, skip silently

Patch 2: Scheduler startup (append to main() before server_forever())

# server.py, in main() after start_backup_scheduler()
if os.environ.get("CRM_GMAIL_INTEGRATION_ENABLED", "").lower() in ("1","true","yes"):
    from email_integration.scheduler import start_sync_scheduler
    start_sync_scheduler()

Patch 3: Route dispatch hook (in CRMHandler.do_GET / do_POST, before the 404)

# server.py, before the 404 fallthrough in CRMHandler
from email_integration.routes import try_handle
if try_handle(self):   # returns True if it claimed the request
    return

All three are guarded by the env var CRM_GMAIL_INTEGRATION_ENABLED. Default is off. Setting it off instantly disables the integration on restart. Migrations are idempotent so there's no "partially-migrated" state.


15. Testing strategy

15a. Unit tests (pure Python, no network)

  • test_parser.py — fixture Gmail payloads covering: plain-only, html-only, multipart/alternative, multipart/mixed with attachments, RFC 2047 subjects, malformed dates, missing Message-ID.
  • test_matcher.py — exact match, domain match, common-personal-domain exclusion, team-own-address exclusion, dedup.
  • test_threads.py — new thread, Gmail-only thread, RFC-only thread, cross-account reconciliation.
  • test_db.py — migrations run on empty DB, re-run is no-op, all upserts idempotent.
  • test_crypto.py — encrypt/decrypt roundtrip, wrong key fails, nonce uniqueness.

15b. Integration tests (mocked HTTP)

Use responses lib or a tiny in-proc mock server to simulate Gmail API responses:

  • Full backfill of 200 messages across 5 pages.
  • Incremental history with messagesAdded + labelsAdded + messagesDeleted.
  • Expired startHistoryId triggers date-backfill fallback.
  • 429 triggers retry; 6 consecutive 429s pauses account.
  • Attachment download happy path + large-file skip.

15c. Manual smoke test before production

  1. Enable feature flag for a single test account first (grant@ten31.xyz only).
  2. Run initial backfill, confirm no errors in email_sync_runs.
  3. Check sample of matched emails in UI against Gmail source.
  4. Verify attachments download and hash matches.
  5. After 24h of incremental sync, enroll remaining 4 teammates.

16. Rollout plan

Week Milestone
1 Review this doc. Adjust schema / decisions as needed.
2 Implement scaffolded modules (code is already stubbed — fill in bodies). Unit tests.
3 Integration tests against a GCP sandbox project.
4 Apply server.py patches behind flag, enable for Grant's account only. Monitor for 1 week.
5 Enroll remaining 4 teammates. Document ops runbook.
6+ Phase 2: UI for threads, search, manual match override.

17. Open questions / deferred decisions

  1. Backup inclusion. Should the existing nightly backup cover emails_* tables? Pro: completeness. Con: bodies contain sensitive content; backups currently written unencrypted. Recommend adding a separate encrypted email backup stream rather than merging into fundraising_backup.
  2. Retention policy for unmatched emails. Current plan: keep metadata indefinitely, never store body. Alternative: delete metadata after 90 days to reduce DB size. User preference needed.
  3. Real-time push. Gmail supports users.watch + Pub/Sub to get sub-second notifications. Out of scope for Phase 1 (requires ingress URL, Pub/Sub, harder on a self-hosted box). Revisit if latency becomes an issue.
  4. Contact auto-creation. Should an email from a new @foo.vc address automatically create a contacts row? Currently no — matching is passive. Easy to add later.
  5. Sent email enrichment. When a teammate sends email to a known investor, should it create/update a CRM task (e.g., "awaiting reply")? Phase 3 roadmap item.
  6. Multi-domain Workspace. If ten31.xyz ever adds a second domain (ten31.io etc), DWD needs updating. Document for future.

Appendix A: Example end-to-end flow

A concrete walkthrough to ground the design.

Scenario: Alice from Acme Capital emails Grant with a fundraising question, cc'ing her partner Bob.

  1. T+0. Alice sends Subject: "Re: Q2 allocation" to grant@ten31.xyz, cc bob@acme.vc.
  2. T+1 min. Gmail delivers. Historically this is invisible to the CRM.
  3. T+1-180 min. Sync scheduler wakes up. For Grant's account, history.list(startHistoryId=<checkpoint>) returns this new message.
  4. Metadata fetch. messages.get(id=..., format=metadata) returns headers. Participant set = {alice@acme.vc, grant@ten31.xyz, bob@acme.vc}. Excluding Grant's own address → {alice@acme.vc, bob@acme.vc}.
  5. Match. INVESTOR_EMAIL_INDEX[alice@acme.vc] → Acme Capital (fundraising_investor_id=fi_xyz). Exact match, confidence 1.0.
  6. Full fetch. messages.get(id=..., format=full) returns body + attachment stubs.
  7. Parse. parser.extract produces ParsedEmail(from=alice, to=[grant], cc=[bob], subject="Re: Q2 allocation", body_text="...", attachments=[{name:"term_sheet.pdf", gmail_attachment_id:"..."}]).
  8. DB writes (one transaction):
    • INSERT INTO emails with rfc_message_id=<CAA...@mail.gmail.com>, match_status='matched'.
    • INSERT INTO email_recipients × 3.
    • INSERT INTO email_account_messages with Grant's account + Gmail message id.
    • INSERT INTO email_investor_links × 1 (to fi_xyz).
    • INSERT INTO email_attachments with download_status='pending'.
    • resolve_thread() finds existing thread via References, updates email_threads.last_message_at.
  9. Attachment download. Worker picks up the pending attachment, calls messages.attachments.get, writes data/email_attachments/<emailid[:2]>/<emailid>/<attid>-term_sheet.pdf, updates row with hash + status.
  10. Sightings. If Bob is also a CRM user with an enrolled mailbox, his sync run also picks up his copy of the same email. rfc_message_id already exists → INSERT INTO email_account_messages only (no new emails row). Emerges as a second sighting.
  11. Result in UI. Acme Capital's investor page shows a new email in the conversation feed, threaded with prior Q2 allocation messages. Term sheet attachment is one click away, served from local disk.

Appendix B: What gets scaffolded now vs. filled in later

The scaffolded modules (backend/email_integration/*.py) include:

  • All module-level docstrings explaining purpose.
  • Complete type signatures and dataclasses.
  • Full SQL for migrations.
  • Working implementations for: config loading, credential abstraction, DB migration application, HTTP route dispatch.
  • Skeleton implementations with TODO markers for: full Gmail API call bodies, MIME tree walking, retry loops.

Lines of code delivered in scaffolding: ~1,500. Remaining to implement: ~1,000 (mostly the Gmail client and parser edge cases). Nothing in server.py or crm.db is changed.