Files
ten31-database/backend/email_integration/routes.py
T
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

463 lines
16 KiB
Python

"""
HTTP route handlers for the Gmail integration.
Designed to plug into server.py's CRMHandler (BaseHTTPRequestHandler) pattern.
The hook is a single function call near the top of do_GET / do_POST that
lets this module claim any /api/email/* request:
# in CRMHandler.do_GET and CRMHandler.do_POST, before the 404 fallthrough:
from email_integration.routes import try_handle
if try_handle(self):
return
`try_handle(handler)` inspects `handler.command` and `handler.get_path()` and
returns True if it handled the request (sent a response).
Every handler respects the same auth / rate-limit model as the rest of server.py
by calling handler.get_user() and handler.rate_limited(...).
"""
import json
import sqlite3
from typing import Optional
from . import config as _cfg
from . import credentials as _creds
from . import crypto as _crypto
from . import db as _db
from . import scheduler as _sched
# ---------------------------------------------------------------------------- dispatch
_GET_ROUTES = {
"/api/email/status": "status",
"/api/email/accounts": "list_accounts",
"/api/email/threads": "list_threads",
"/api/email/oauth/start": "oauth_start",
"/api/email/oauth/callback": "oauth_callback",
}
_POST_ROUTES = {
"/api/email/accounts/enroll-all": "enroll_all",
"/api/email/accounts/enroll": "enroll_one",
"/api/email/sync/run-now": "run_now",
"/api/email/rematch": "rematch",
}
def try_handle(handler) -> bool:
path = handler.get_path()
method = handler.command
table = _GET_ROUTES if method == "GET" else _POST_ROUTES if method == "POST" else {}
name = table.get(path)
if not path.startswith("/api/email/"):
return False
if not name:
# Route is owned by this module but unknown — return a proper 404
# instead of letting the main dispatcher's 404 abuse counter fire.
handler.send_error_json("Not found", 404)
return True
if not _cfg.CONFIG.enabled:
handler.send_error_json("Email integration disabled", 503)
return True
# Also enforce attachment streaming under a different prefix
# (handled above via prefix check).
impl = globals().get(f"_h_{name}")
if impl is None:
handler.send_error_json("Not implemented", 500)
return True
try:
impl(handler)
except Exception as e:
handler.send_error_json(f"Internal error: {e}", 500)
return True
# ---------------------------------------------------------------------------- helpers
def _conn() -> sqlite3.Connection:
import os
db_path = os.environ.get(
"CRM_DB_PATH",
os.path.join(_cfg.CONFIG.data_dir, "crm.db"),
)
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000")
conn.row_factory = sqlite3.Row
return conn
def _require_auth(handler) -> Optional[dict]:
user = handler.get_user()
if not user:
handler.send_error_json("Unauthorized", 401)
return None
return user
def _require_admin(handler) -> Optional[dict]:
user = _require_auth(handler)
if user is None:
return None
if user.get("role") != "admin":
handler.send_error_json("Admin required", 403)
return None
return user
# ---------------------------------------------------------------------------- GET handlers
def _h_status(handler):
user = _require_auth(handler)
if not user:
return
snap = _sched.status_snapshot()
conn = _conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) AS n_accounts, "
"SUM(CASE WHEN sync_status='active' THEN 1 ELSE 0 END) AS n_active, "
"SUM(CASE WHEN sync_status='error' THEN 1 ELSE 0 END) AS n_error "
"FROM email_accounts"
)
counts = dict(cur.fetchone() or {})
cur.execute("SELECT COUNT(*) AS n FROM emails WHERE match_status = 'matched'")
snap["matched_emails"] = cur.fetchone()["n"]
finally:
conn.close()
snap["accounts_summary"] = counts
handler.send_json(snap)
def _h_list_accounts(handler):
user = _require_auth(handler)
if not user:
return
conn = _conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT id, user_id, email_address, auth_method, sync_enabled, "
"sync_status, sync_error, last_synced_at, backfill_complete "
"FROM email_accounts ORDER BY email_address"
)
rows = [dict(r) for r in cur.fetchall()]
finally:
conn.close()
# Non-admins only see their own row
if user.get("role") != "admin":
rows = [r for r in rows if r["user_id"] == user["user_id"]]
handler.send_json({"accounts": rows})
def _h_list_threads(handler):
user = _require_auth(handler)
if not user:
return
q = handler.get_query_params()
investor_id = q.get("investor_id")
limit = min(int(q.get("limit", 50)), 500)
conn = _conn()
try:
cur = conn.cursor()
if investor_id:
cur.execute(
"""SELECT t.*
FROM email_threads t
JOIN emails e ON e.thread_id = t.id
JOIN email_investor_links l ON l.email_id = e.id
WHERE l.fundraising_investor_id = ?
OR l.fundraising_contact_id IN (
SELECT id FROM fundraising_contacts WHERE investor_id = ?
)
GROUP BY t.id
ORDER BY t.last_message_at DESC
LIMIT ?""",
(investor_id, investor_id, limit),
)
else:
cur.execute(
"SELECT * FROM email_threads WHERE is_matched = 1 "
"ORDER BY last_message_at DESC LIMIT ?",
(limit,),
)
threads = [dict(r) for r in cur.fetchall()]
finally:
conn.close()
handler.send_json({"threads": threads})
def _h_oauth_start(handler):
"""Begin per-user OAuth consent flow (fallback path)."""
user = _require_auth(handler)
if not user:
return
if _cfg.CONFIG.primary_auth != "oauth":
return handler.send_error_json(
"Per-user OAuth disabled (set CRM_GMAIL_AUTH_METHOD=oauth to enable)", 400
)
q = handler.get_query_params()
account_email = q.get("account_email") or ""
if not account_email:
return handler.send_error_json("account_email required", 400)
import secrets
import urllib.parse
state = secrets.token_urlsafe(32)
_oauth_state_store(state, user["user_id"], account_email)
params = {
"client_id": _cfg.CONFIG.oauth_client_id,
"redirect_uri": _cfg.CONFIG.oauth_redirect_uri,
"response_type": "code",
"scope": _creds.GMAIL_READONLY_SCOPE,
"access_type": "offline",
"prompt": "consent",
"state": state,
"login_hint": account_email,
}
url = "https://accounts.google.com/o/oauth2/v2/auth?" + urllib.parse.urlencode(params)
handler.send_json({"redirect_url": url})
def _h_oauth_callback(handler):
"""Exchange code for tokens, encrypt refresh token, store."""
q = handler.get_query_params()
code = q.get("code")
state = q.get("state")
if not code or not state:
return handler.send_error_json("code and state required", 400)
state_row = _oauth_state_consume(state)
if not state_row:
return handler.send_error_json("Invalid state", 400)
import urllib.parse
import urllib.request
body = urllib.parse.urlencode({
"code": code,
"client_id": _cfg.CONFIG.oauth_client_id,
"client_secret": _cfg.CONFIG.oauth_client_secret,
"redirect_uri": _cfg.CONFIG.oauth_redirect_uri,
"grant_type": "authorization_code",
}).encode("ascii")
req = urllib.request.Request(
"https://oauth2.googleapis.com/token",
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
payload = json.loads(resp.read())
except Exception as e:
return handler.send_error_json(f"Token exchange failed: {e}", 500)
refresh = payload.get("refresh_token")
if not refresh:
return handler.send_error_json("No refresh_token returned (user may have previously consented; prompt=consent required)", 400)
enc = _crypto.encrypt(refresh.encode("ascii"), secret_key_b64=_cfg.CONFIG.secret_key_b64)
conn = _conn()
try:
_db.upsert_account(conn, user_id=state_row["user_id"],
email_address=state_row["account_email"],
auth_method="oauth")
conn.execute(
"UPDATE email_accounts SET oauth_refresh_enc = ?, sync_status = 'pending', "
"updated_at = datetime('now') WHERE email_address = ?",
(enc, state_row["account_email"]),
)
conn.commit()
finally:
conn.close()
handler.send_json({"ok": True, "account_email": state_row["account_email"]})
# ---------------------------------------------------------------------------- POST handlers
def _h_enroll_all(handler):
"""Admin: enroll every CRM user whose email is @workspace_domain via DWD."""
user = _require_admin(handler)
if not user:
return
if _cfg.CONFIG.primary_auth != "dwd":
return handler.send_error_json("enroll-all only valid in DWD mode", 400)
domain = _cfg.CONFIG.workspace_domain
if not domain:
return handler.send_error_json("CRM_GMAIL_WORKSPACE_DOMAIN not set", 400)
conn = _conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT id, email FROM users WHERE is_active = 1 AND email LIKE ?",
(f"%@{domain}",),
)
users = cur.fetchall()
created = []
for u in users:
aid = _db.upsert_account(conn, user_id=u["id"],
email_address=u["email"].lower(),
auth_method="dwd")
created.append({"account_id": aid, "email": u["email"]})
conn.commit()
finally:
conn.close()
handler.send_json({"enrolled": created, "count": len(created)})
def _h_enroll_one(handler):
user = _require_admin(handler)
if not user:
return
body = handler.get_body() or {}
# Accept either `email` or `email_address` for ergonomics.
email_address = (body.get("email_address") or body.get("email") or "").lower().strip()
user_id = body.get("user_id")
auth_method = body.get("auth_method") or _cfg.CONFIG.primary_auth
if not email_address:
return handler.send_error_json("email (or email_address) required", 400)
# If the caller didn't specify a CRM user_id, resolve it from the
# users table by matching email. Falls back to the authenticated
# admin's own id (handles the common case of a single admin
# enrolling themselves without having to paste their UUID).
if not user_id:
conn = _conn()
try:
cur = conn.cursor()
cur.execute("SELECT id FROM users WHERE LOWER(email) = ?",
(email_address,))
row = cur.fetchone()
user_id = row["id"] if row else user.get("id")
finally:
conn.close()
if not user_id:
return handler.send_error_json("could not resolve user_id for that email", 400)
conn = _conn()
try:
aid = _db.upsert_account(conn, user_id=user_id,
email_address=email_address,
auth_method=auth_method)
conn.commit()
finally:
conn.close()
handler.send_json({"account_id": aid, "email": email_address, "user_id": user_id})
def _h_run_now(handler):
user = _require_admin(handler)
if not user:
return
# Reuse existing rate limit so admins can't hammer this.
if handler.rate_limited("email-sync-now", 6):
return handler.send_error_json("Too many requests", 429)
result = _sched.trigger_run_now()
handler.send_json(result)
def _h_rematch(handler):
"""Re-evaluate unmatched emails against the current investor index."""
user = _require_admin(handler)
if not user:
return
body = handler.get_body() or {}
since = body.get("since") # optional ISO8601
conn = _conn()
scanned = 0
matched = 0
try:
from .matcher import InvestorIndex
index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain)
index.rebuild(_conn)
cur = conn.cursor()
sql = ("SELECT id, from_email, to_emails_json, cc_emails_json "
"FROM emails WHERE match_status = 'unmatched'")
params: list = []
if since:
sql += " AND sent_at >= ?"
params.append(since)
sql += " ORDER BY sent_at DESC LIMIT 10000"
cur.execute(sql, params)
for row in cur.fetchall():
scanned += 1
participants = set()
if row["from_email"]:
participants.add(row["from_email"].lower())
for col in ("to_emails_json", "cc_emails_json"):
try:
arr = json.loads(row[col] or "[]")
except Exception:
arr = []
for a in arr:
e = a.get("email") if isinstance(a, dict) else a
if e:
participants.add(e.lower())
links = index.match(participants)
if not links:
continue
matched += 1
conn.execute(
"UPDATE emails SET match_status='matched', is_matched=1, "
"updated_at=datetime('now') WHERE id=?",
(row["id"],),
)
for link in links:
_db.insert_investor_link(conn, email_id=row["id"], link={
"matched_address": link.matched_address,
"match_kind": link.match_kind,
"match_confidence": link.match_confidence,
"fundraising_investor_id": link.target.fundraising_investor_id,
"fundraising_contact_id": link.target.fundraising_contact_id,
"contact_id": link.target.contact_id,
"organization_id": link.target.organization_id,
})
# NOTE: body is still missing — we only have headers. A follow-up
# job can re-fetch the full message from Gmail using the sighting's
# gmail_message_id. Not done inline to keep this endpoint fast.
conn.commit()
finally:
conn.close()
handler.send_json({"scanned": scanned, "newly_matched": matched})
# ---------------------------------------------------------------------------- OAuth state store (in-memory)
# For a 5-person CRM the state store doesn't need to be durable — a server
# restart between start and callback is rare and just requires a retry.
_oauth_states: dict[str, dict] = {}
_oauth_state_lock = __import__("threading").Lock()
def _oauth_state_store(state: str, user_id: str, account_email: str) -> None:
import time
with _oauth_state_lock:
# Prune stale entries (>10 min).
cutoff = time.time() - 600
for k, v in list(_oauth_states.items()):
if v["created"] < cutoff:
_oauth_states.pop(k, None)
_oauth_states[state] = {
"user_id": user_id,
"account_email": account_email.lower().strip(),
"created": time.time(),
}
def _oauth_state_consume(state: str) -> Optional[dict]:
with _oauth_state_lock:
return _oauth_states.pop(state, None)