c7ce44d963
Workstream A–C substrate for the Ten31 agentic system: - A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9 - A2: additive/reversible core migration (canonical_entities, entity_links, interaction_log, relationship_edges, soft-delete) + ledgered runner - B1/B3: chunking + deterministic entity resolution (backend/ingest) - B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks - C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools - docs: redaction/re-hydration, Gmail enablement runbook - synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db, drop legacy files + start9/0.3.5) Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity queries). Real backfill runs on Ten31 infra; index holds synthetic data only. Branch snapshot also captures pre-existing working-tree changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
250 lines
9.6 KiB
Python
250 lines
9.6 KiB
Python
"""
|
|
Thin Gmail API wrapper.
|
|
|
|
Responsibilities:
|
|
- HTTPS calls to https://gmail.googleapis.com/gmail/v1/users/me/*
|
|
- Per-account access-token injection via CredentialProvider
|
|
- Rate limiting via token bucket
|
|
- Retry loop with exponential backoff + jitter for RETRYABLE errors
|
|
- Batch requests for metadata fetches (multipart/mixed) — sketch provided
|
|
- Call-count accounting for observability (plumbed to email_sync_runs)
|
|
|
|
We call Gmail over raw urllib instead of the google-api-python-client to keep
|
|
the dependency surface small. If you prefer the Google SDK, replace _call()
|
|
with client calls; everything else is independent.
|
|
"""
|
|
|
|
import json
|
|
import random
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Iterator, Optional
|
|
|
|
from . import config as _cfg
|
|
from . import errors
|
|
|
|
|
|
BASE = "https://gmail.googleapis.com/gmail/v1/users"
|
|
|
|
|
|
# ---------------------------------------------------------------------------- token bucket
|
|
|
|
class _TokenBucket:
|
|
"""Simple per-account rate limiter. Call wait(cost) before each API call."""
|
|
|
|
def __init__(self, units_per_sec: int, burst: Optional[int] = None):
|
|
self._rate = float(units_per_sec)
|
|
self._burst = float(burst if burst is not None else units_per_sec)
|
|
self._tokens = self._burst
|
|
self._last = time.monotonic()
|
|
self._lock = threading.Lock()
|
|
|
|
def wait(self, cost: float) -> None:
|
|
while True:
|
|
with self._lock:
|
|
now = time.monotonic()
|
|
self._tokens = min(self._burst, self._tokens + (now - self._last) * self._rate)
|
|
self._last = now
|
|
if self._tokens >= cost:
|
|
self._tokens -= cost
|
|
return
|
|
needed = cost - self._tokens
|
|
sleep_for = needed / self._rate
|
|
time.sleep(sleep_for)
|
|
|
|
|
|
# ---------------------------------------------------------------------------- call stats
|
|
|
|
@dataclass
|
|
class CallStats:
|
|
api_calls: int = 0
|
|
retries: int = 0
|
|
bytes_in: int = 0
|
|
last_errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------- client
|
|
|
|
class GmailClient:
|
|
"""Per-account Gmail client. Bind one instance per sync run."""
|
|
|
|
def __init__(self, credential_provider, email_address: str, stats: Optional[CallStats] = None):
|
|
self._creds = credential_provider
|
|
self._email = email_address
|
|
self._bucket = _TokenBucket(units_per_sec=_cfg.CONFIG.rate_limit_units_per_sec_per_account)
|
|
self.stats = stats or CallStats()
|
|
|
|
# -------------------------------------------------------------- messages.*
|
|
|
|
def list_messages(self, *, q: str = "", page_token: Optional[str] = None,
|
|
max_results: int = 500, label_ids: Optional[list[str]] = None) -> dict:
|
|
"""https://developers.google.com/gmail/api/reference/rest/v1/users.messages/list"""
|
|
params = {"maxResults": str(max_results)}
|
|
if q:
|
|
params["q"] = q
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
if label_ids:
|
|
for lid in label_ids:
|
|
params.setdefault("labelIds", [])
|
|
params["labelIds"].append(lid) if False else None
|
|
return self._get("/messages", params=params, cost=5)
|
|
|
|
def get_message(self, message_id: str, *, format: str = "metadata",
|
|
metadata_headers: Optional[list[str]] = None) -> dict:
|
|
params = {"format": format}
|
|
if format == "metadata" and metadata_headers:
|
|
params["metadataHeaders"] = metadata_headers
|
|
return self._get(f"/messages/{message_id}", params=params, cost=5)
|
|
|
|
def get_attachment(self, message_id: str, attachment_id: str) -> dict:
|
|
return self._get(
|
|
f"/messages/{message_id}/attachments/{attachment_id}",
|
|
params=None,
|
|
cost=5,
|
|
)
|
|
|
|
# -------------------------------------------------------------- history.*
|
|
|
|
def list_history(self, *, start_history_id: str, page_token: Optional[str] = None,
|
|
history_types: Optional[list[str]] = None) -> dict:
|
|
params = {"startHistoryId": start_history_id, "maxResults": "500"}
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
if history_types:
|
|
params["historyTypes"] = history_types
|
|
try:
|
|
return self._get("/history", params=params, cost=2)
|
|
except errors.NotFoundError as e:
|
|
# Gmail returns 404 when startHistoryId is too old. Wrap for callers.
|
|
raise errors.HistoryExpiredError(
|
|
"startHistoryId no longer available", status=404, payload=getattr(e, "payload", None)
|
|
) from e
|
|
|
|
# -------------------------------------------------------------- profile
|
|
|
|
def get_profile(self) -> dict:
|
|
return self._get("/profile", params=None, cost=1)
|
|
|
|
# -------------------------------------------------------------- iteration helpers
|
|
|
|
def iter_messages(self, *, q: str = "") -> Iterator[dict]:
|
|
page_token: Optional[str] = None
|
|
while True:
|
|
resp = self.list_messages(q=q, page_token=page_token,
|
|
max_results=_cfg.CONFIG.backfill_page_size)
|
|
for m in resp.get("messages") or []:
|
|
yield m
|
|
page_token = resp.get("nextPageToken")
|
|
if not page_token:
|
|
return
|
|
|
|
def iter_history(self, *, start_history_id: str,
|
|
history_types: Optional[list[str]] = None) -> Iterator[dict]:
|
|
page_token: Optional[str] = None
|
|
while True:
|
|
resp = self.list_history(
|
|
start_history_id=start_history_id,
|
|
page_token=page_token,
|
|
history_types=history_types,
|
|
)
|
|
for h in resp.get("history") or []:
|
|
yield h
|
|
page_token = resp.get("nextPageToken")
|
|
if not page_token:
|
|
# Cache final historyId for caller to checkpoint.
|
|
self._last_history_id = resp.get("historyId")
|
|
return
|
|
|
|
@property
|
|
def last_history_id(self) -> Optional[str]:
|
|
return getattr(self, "_last_history_id", None)
|
|
|
|
# -------------------------------------------------------------- internals
|
|
|
|
def _get(self, path: str, *, params: Optional[dict], cost: float) -> dict:
|
|
return self._with_retry(lambda: self._call("GET", path, params=params, cost=cost))
|
|
|
|
def _call(self, method: str, path: str, *, params: Optional[dict] = None,
|
|
body: Optional[bytes] = None, cost: float = 1.0) -> dict:
|
|
self._bucket.wait(cost)
|
|
self.stats.api_calls += 1
|
|
|
|
qs = ""
|
|
if params:
|
|
# urllib.parse.urlencode with doseq=True handles repeated params
|
|
# like metadataHeaders=Foo&metadataHeaders=Bar correctly.
|
|
qs = "?" + urllib.parse.urlencode(params, doseq=True)
|
|
url = f"{BASE}/me{path}{qs}"
|
|
|
|
token = self._creds.access_token_for(self._email)
|
|
req = urllib.request.Request(url, method=method, data=body)
|
|
req.add_header("Authorization", f"Bearer {token.token}")
|
|
req.add_header("Accept", "application/json")
|
|
if body:
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read()
|
|
self.stats.bytes_in += len(raw)
|
|
if not raw:
|
|
return {}
|
|
return json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
raw = e.read() or b""
|
|
self.stats.bytes_in += len(raw)
|
|
try:
|
|
payload = json.loads(raw) if raw else {}
|
|
except Exception:
|
|
payload = {"raw": raw.decode("utf-8", errors="replace")}
|
|
err = errors.classify_http(e.code, payload)
|
|
# short-message logging hook (redacted of tokens by design)
|
|
self.stats.last_errors.append(f"{e.code} {type(err).__name__}")
|
|
self.stats.last_errors = self.stats.last_errors[-10:]
|
|
raise err
|
|
except (urllib.error.URLError, TimeoutError) as e:
|
|
raise errors.TransientError(f"network error: {e}") from e
|
|
|
|
def _with_retry(self, fn):
|
|
cfg = _cfg.CONFIG
|
|
attempts = 0
|
|
delay = cfg.retry_initial_delay_sec
|
|
while True:
|
|
try:
|
|
return fn()
|
|
except errors.RETRYABLE as e:
|
|
attempts += 1
|
|
if attempts >= cfg.retry_max_attempts:
|
|
raise
|
|
self.stats.retries += 1
|
|
# Full jitter
|
|
sleep_for = random.uniform(0, min(delay, cfg.retry_max_delay_sec))
|
|
time.sleep(sleep_for)
|
|
delay = min(delay * 2, cfg.retry_max_delay_sec)
|
|
# Non-retryable errors propagate immediately.
|
|
|
|
|
|
# ---------------------------------------------------------------------------- batch fetch sketch
|
|
|
|
def batch_get_metadata(client: GmailClient, message_ids: list[str],
|
|
headers: list[str]) -> dict[str, dict]:
|
|
"""Fetch metadata for up to ~100 messages.
|
|
|
|
TODO: implement using Gmail's multipart/mixed batch endpoint at
|
|
https://www.googleapis.com/batch/gmail/v1 for efficiency. In the scaffold
|
|
we fall back to serial gets so the logic is correct from day 1.
|
|
"""
|
|
out: dict[str, dict] = {}
|
|
for mid in message_ids:
|
|
try:
|
|
out[mid] = client.get_message(mid, format="metadata", metadata_headers=headers)
|
|
except errors.NotFoundError:
|
|
# Message deleted between list and get — skip.
|
|
continue
|
|
return out
|