Files
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

250 lines
9.6 KiB
Python

"""
Thin Gmail API wrapper.
Responsibilities:
- HTTPS calls to https://gmail.googleapis.com/gmail/v1/users/me/*
- Per-account access-token injection via CredentialProvider
- Rate limiting via token bucket
- Retry loop with exponential backoff + jitter for RETRYABLE errors
- Batch requests for metadata fetches (multipart/mixed) — sketch provided
- Call-count accounting for observability (plumbed to email_sync_runs)
We call Gmail over raw urllib instead of the google-api-python-client to keep
the dependency surface small. If you prefer the Google SDK, replace _call()
with client calls; everything else is independent.
"""
import json
import random
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass, field
from typing import Any, Iterator, Optional
from . import config as _cfg
from . import errors
BASE = "https://gmail.googleapis.com/gmail/v1/users"
# ---------------------------------------------------------------------------- token bucket
class _TokenBucket:
"""Simple per-account rate limiter. Call wait(cost) before each API call."""
def __init__(self, units_per_sec: int, burst: Optional[int] = None):
self._rate = float(units_per_sec)
self._burst = float(burst if burst is not None else units_per_sec)
self._tokens = self._burst
self._last = time.monotonic()
self._lock = threading.Lock()
def wait(self, cost: float) -> None:
while True:
with self._lock:
now = time.monotonic()
self._tokens = min(self._burst, self._tokens + (now - self._last) * self._rate)
self._last = now
if self._tokens >= cost:
self._tokens -= cost
return
needed = cost - self._tokens
sleep_for = needed / self._rate
time.sleep(sleep_for)
# ---------------------------------------------------------------------------- call stats
@dataclass
class CallStats:
api_calls: int = 0
retries: int = 0
bytes_in: int = 0
last_errors: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------- client
class GmailClient:
"""Per-account Gmail client. Bind one instance per sync run."""
def __init__(self, credential_provider, email_address: str, stats: Optional[CallStats] = None):
self._creds = credential_provider
self._email = email_address
self._bucket = _TokenBucket(units_per_sec=_cfg.CONFIG.rate_limit_units_per_sec_per_account)
self.stats = stats or CallStats()
# -------------------------------------------------------------- messages.*
def list_messages(self, *, q: str = "", page_token: Optional[str] = None,
max_results: int = 500, label_ids: Optional[list[str]] = None) -> dict:
"""https://developers.google.com/gmail/api/reference/rest/v1/users.messages/list"""
params = {"maxResults": str(max_results)}
if q:
params["q"] = q
if page_token:
params["pageToken"] = page_token
if label_ids:
for lid in label_ids:
params.setdefault("labelIds", [])
params["labelIds"].append(lid) if False else None
return self._get("/messages", params=params, cost=5)
def get_message(self, message_id: str, *, format: str = "metadata",
metadata_headers: Optional[list[str]] = None) -> dict:
params = {"format": format}
if format == "metadata" and metadata_headers:
params["metadataHeaders"] = metadata_headers
return self._get(f"/messages/{message_id}", params=params, cost=5)
def get_attachment(self, message_id: str, attachment_id: str) -> dict:
return self._get(
f"/messages/{message_id}/attachments/{attachment_id}",
params=None,
cost=5,
)
# -------------------------------------------------------------- history.*
def list_history(self, *, start_history_id: str, page_token: Optional[str] = None,
history_types: Optional[list[str]] = None) -> dict:
params = {"startHistoryId": start_history_id, "maxResults": "500"}
if page_token:
params["pageToken"] = page_token
if history_types:
params["historyTypes"] = history_types
try:
return self._get("/history", params=params, cost=2)
except errors.NotFoundError as e:
# Gmail returns 404 when startHistoryId is too old. Wrap for callers.
raise errors.HistoryExpiredError(
"startHistoryId no longer available", status=404, payload=getattr(e, "payload", None)
) from e
# -------------------------------------------------------------- profile
def get_profile(self) -> dict:
return self._get("/profile", params=None, cost=1)
# -------------------------------------------------------------- iteration helpers
def iter_messages(self, *, q: str = "") -> Iterator[dict]:
page_token: Optional[str] = None
while True:
resp = self.list_messages(q=q, page_token=page_token,
max_results=_cfg.CONFIG.backfill_page_size)
for m in resp.get("messages") or []:
yield m
page_token = resp.get("nextPageToken")
if not page_token:
return
def iter_history(self, *, start_history_id: str,
history_types: Optional[list[str]] = None) -> Iterator[dict]:
page_token: Optional[str] = None
while True:
resp = self.list_history(
start_history_id=start_history_id,
page_token=page_token,
history_types=history_types,
)
for h in resp.get("history") or []:
yield h
page_token = resp.get("nextPageToken")
if not page_token:
# Cache final historyId for caller to checkpoint.
self._last_history_id = resp.get("historyId")
return
@property
def last_history_id(self) -> Optional[str]:
return getattr(self, "_last_history_id", None)
# -------------------------------------------------------------- internals
def _get(self, path: str, *, params: Optional[dict], cost: float) -> dict:
return self._with_retry(lambda: self._call("GET", path, params=params, cost=cost))
def _call(self, method: str, path: str, *, params: Optional[dict] = None,
body: Optional[bytes] = None, cost: float = 1.0) -> dict:
self._bucket.wait(cost)
self.stats.api_calls += 1
qs = ""
if params:
# urllib.parse.urlencode with doseq=True handles repeated params
# like metadataHeaders=Foo&metadataHeaders=Bar correctly.
qs = "?" + urllib.parse.urlencode(params, doseq=True)
url = f"{BASE}/me{path}{qs}"
token = self._creds.access_token_for(self._email)
req = urllib.request.Request(url, method=method, data=body)
req.add_header("Authorization", f"Bearer {token.token}")
req.add_header("Accept", "application/json")
if body:
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
self.stats.bytes_in += len(raw)
if not raw:
return {}
return json.loads(raw)
except urllib.error.HTTPError as e:
raw = e.read() or b""
self.stats.bytes_in += len(raw)
try:
payload = json.loads(raw) if raw else {}
except Exception:
payload = {"raw": raw.decode("utf-8", errors="replace")}
err = errors.classify_http(e.code, payload)
# short-message logging hook (redacted of tokens by design)
self.stats.last_errors.append(f"{e.code} {type(err).__name__}")
self.stats.last_errors = self.stats.last_errors[-10:]
raise err
except (urllib.error.URLError, TimeoutError) as e:
raise errors.TransientError(f"network error: {e}") from e
def _with_retry(self, fn):
cfg = _cfg.CONFIG
attempts = 0
delay = cfg.retry_initial_delay_sec
while True:
try:
return fn()
except errors.RETRYABLE as e:
attempts += 1
if attempts >= cfg.retry_max_attempts:
raise
self.stats.retries += 1
# Full jitter
sleep_for = random.uniform(0, min(delay, cfg.retry_max_delay_sec))
time.sleep(sleep_for)
delay = min(delay * 2, cfg.retry_max_delay_sec)
# Non-retryable errors propagate immediately.
# ---------------------------------------------------------------------------- batch fetch sketch
def batch_get_metadata(client: GmailClient, message_ids: list[str],
headers: list[str]) -> dict[str, dict]:
"""Fetch metadata for up to ~100 messages.
TODO: implement using Gmail's multipart/mixed batch endpoint at
https://www.googleapis.com/batch/gmail/v1 for efficiency. In the scaffold
we fall back to serial gets so the logic is correct from day 1.
"""
out: dict[str, dict] = {}
for mid in message_ids:
try:
out[mid] = client.get_message(mid, format="metadata", metadata_headers=headers)
except errors.NotFoundError:
# Message deleted between list and get — skip.
continue
return out