ea036f49a6
insert_email's recipients loop did `for a in parsed.get(kind, [])`, but the parser sets reply_to=None when there is no Reply-To header, so .get returns None (key present) and the loop raised 'NoneType' object is not iterable — aborting the entire Gmail backfill on the first such email (i.e. almost immediately). Fixed with `or []`. Regression test test_insert_email.py (reply_to=None, all-None recipients, happy path). Because the scheduler intentionally skips error-status accounts (no retry storms), an errored mailbox would never resume on its own. "Sync now" now clears error status first, so it is an explicit retry; backfill resumes from its saved cursor and dedups by Message-ID, so nothing is re-captured. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
486 lines
17 KiB
Python
486 lines
17 KiB
Python
"""
|
|
HTTP route handlers for the Gmail integration.
|
|
|
|
Designed to plug into server.py's CRMHandler (BaseHTTPRequestHandler) pattern.
|
|
The hook is a single function call near the top of do_GET / do_POST that
|
|
lets this module claim any /api/email/* request:
|
|
|
|
# in CRMHandler.do_GET and CRMHandler.do_POST, before the 404 fallthrough:
|
|
from email_integration.routes import try_handle
|
|
if try_handle(self):
|
|
return
|
|
|
|
`try_handle(handler)` inspects `handler.command` and `handler.get_path()` and
|
|
returns True if it handled the request (sent a response).
|
|
|
|
Every handler respects the same auth / rate-limit model as the rest of server.py
|
|
by calling handler.get_user() and handler.rate_limited(...).
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
from typing import Optional
|
|
|
|
from . import config as _cfg
|
|
from . import credentials as _creds
|
|
from . import crypto as _crypto
|
|
from . import db as _db
|
|
from . import scheduler as _sched
|
|
|
|
|
|
# ---------------------------------------------------------------------------- dispatch
|
|
|
|
_GET_ROUTES = {
|
|
"/api/email/status": "status",
|
|
"/api/email/accounts": "list_accounts",
|
|
"/api/email/threads": "list_threads",
|
|
"/api/email/oauth/start": "oauth_start",
|
|
"/api/email/oauth/callback": "oauth_callback",
|
|
}
|
|
|
|
_POST_ROUTES = {
|
|
"/api/email/accounts/enroll-all": "enroll_all",
|
|
"/api/email/accounts/enroll": "enroll_one",
|
|
"/api/email/sync/run-now": "run_now",
|
|
"/api/email/rematch": "rematch",
|
|
}
|
|
|
|
|
|
def try_handle(handler) -> bool:
|
|
path = handler.get_path()
|
|
method = handler.command
|
|
table = _GET_ROUTES if method == "GET" else _POST_ROUTES if method == "POST" else {}
|
|
name = table.get(path)
|
|
if not path.startswith("/api/email/"):
|
|
return False
|
|
if not name:
|
|
# Route is owned by this module but unknown — return a proper 404
|
|
# instead of letting the main dispatcher's 404 abuse counter fire.
|
|
handler.send_error_json("Not found", 404)
|
|
return True
|
|
|
|
if not _cfg.CONFIG.enabled:
|
|
handler.send_error_json("Email integration disabled", 503)
|
|
return True
|
|
|
|
# Also enforce attachment streaming under a different prefix
|
|
# (handled above via prefix check).
|
|
|
|
impl = globals().get(f"_h_{name}")
|
|
if impl is None:
|
|
handler.send_error_json("Not implemented", 500)
|
|
return True
|
|
|
|
try:
|
|
impl(handler)
|
|
except Exception as e:
|
|
handler.send_error_json(f"Internal error: {e}", 500)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------- helpers
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
import os
|
|
db_path = os.environ.get(
|
|
"CRM_DB_PATH",
|
|
os.path.join(_cfg.CONFIG.data_dir, "crm.db"),
|
|
)
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _require_auth(handler) -> Optional[dict]:
|
|
user = handler.get_user()
|
|
if not user:
|
|
handler.send_error_json("Unauthorized", 401)
|
|
return None
|
|
return user
|
|
|
|
|
|
def _require_admin(handler) -> Optional[dict]:
|
|
user = _require_auth(handler)
|
|
if user is None:
|
|
return None
|
|
if user.get("role") != "admin":
|
|
handler.send_error_json("Admin required", 403)
|
|
return None
|
|
return user
|
|
|
|
|
|
# ---------------------------------------------------------------------------- GET handlers
|
|
|
|
def _h_status(handler):
|
|
user = _require_auth(handler)
|
|
if not user:
|
|
return
|
|
snap = _sched.status_snapshot()
|
|
conn = _conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT COUNT(*) AS n_accounts, "
|
|
"SUM(CASE WHEN sync_status='active' THEN 1 ELSE 0 END) AS n_active, "
|
|
"SUM(CASE WHEN sync_status='error' THEN 1 ELSE 0 END) AS n_error "
|
|
"FROM email_accounts"
|
|
)
|
|
counts = dict(cur.fetchone() or {})
|
|
cur.execute("SELECT COUNT(*) AS n FROM emails WHERE match_status = 'matched'")
|
|
snap["matched_emails"] = cur.fetchone()["n"]
|
|
# Total captured climbs page-by-page during backfill (the sync_runs row only
|
|
# finalizes at the end), so it is the live progress signal.
|
|
cur.execute("SELECT COUNT(*) AS n FROM emails")
|
|
snap["captured_emails"] = cur.fetchone()["n"]
|
|
# Latest sync run (status running|ok|error|partial) for a human-readable line.
|
|
cur.execute("SELECT kind, status, started_at, finished_at, messages_seen, messages_stored "
|
|
"FROM email_sync_runs ORDER BY started_at DESC LIMIT 1")
|
|
lr = cur.fetchone()
|
|
snap["last_run"] = dict(lr) if lr else None
|
|
# An enrolled account whose backfill has not completed is still pulling history.
|
|
cur.execute("SELECT COUNT(*) AS n FROM email_accounts WHERE sync_enabled = 1 AND backfill_complete = 0")
|
|
snap["backfilling"] = (cur.fetchone()["n"] or 0) > 0
|
|
finally:
|
|
conn.close()
|
|
snap["accounts_summary"] = counts
|
|
handler.send_json(snap)
|
|
|
|
|
|
def _h_list_accounts(handler):
|
|
user = _require_auth(handler)
|
|
if not user:
|
|
return
|
|
conn = _conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id, user_id, email_address, auth_method, sync_enabled, "
|
|
"sync_status, sync_error, last_synced_at, backfill_complete "
|
|
"FROM email_accounts ORDER BY email_address"
|
|
)
|
|
rows = [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
# Non-admins only see their own row
|
|
if user.get("role") != "admin":
|
|
rows = [r for r in rows if r["user_id"] == user["user_id"]]
|
|
handler.send_json({"accounts": rows})
|
|
|
|
|
|
def _h_list_threads(handler):
|
|
user = _require_auth(handler)
|
|
if not user:
|
|
return
|
|
q = handler.get_query_params()
|
|
investor_id = q.get("investor_id")
|
|
limit = min(int(q.get("limit", 50)), 500)
|
|
conn = _conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
if investor_id:
|
|
cur.execute(
|
|
"""SELECT t.*
|
|
FROM email_threads t
|
|
JOIN emails e ON e.thread_id = t.id
|
|
JOIN email_investor_links l ON l.email_id = e.id
|
|
WHERE l.fundraising_investor_id = ?
|
|
OR l.fundraising_contact_id IN (
|
|
SELECT id FROM fundraising_contacts WHERE investor_id = ?
|
|
)
|
|
GROUP BY t.id
|
|
ORDER BY t.last_message_at DESC
|
|
LIMIT ?""",
|
|
(investor_id, investor_id, limit),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT * FROM email_threads WHERE is_matched = 1 "
|
|
"ORDER BY last_message_at DESC LIMIT ?",
|
|
(limit,),
|
|
)
|
|
threads = [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
handler.send_json({"threads": threads})
|
|
|
|
|
|
def _h_oauth_start(handler):
|
|
"""Begin per-user OAuth consent flow (fallback path)."""
|
|
user = _require_auth(handler)
|
|
if not user:
|
|
return
|
|
if _cfg.CONFIG.primary_auth != "oauth":
|
|
return handler.send_error_json(
|
|
"Per-user OAuth disabled (set CRM_GMAIL_AUTH_METHOD=oauth to enable)", 400
|
|
)
|
|
q = handler.get_query_params()
|
|
account_email = q.get("account_email") or ""
|
|
if not account_email:
|
|
return handler.send_error_json("account_email required", 400)
|
|
|
|
import secrets
|
|
import urllib.parse
|
|
state = secrets.token_urlsafe(32)
|
|
_oauth_state_store(state, user["user_id"], account_email)
|
|
|
|
params = {
|
|
"client_id": _cfg.CONFIG.oauth_client_id,
|
|
"redirect_uri": _cfg.CONFIG.oauth_redirect_uri,
|
|
"response_type": "code",
|
|
"scope": _creds.GMAIL_READONLY_SCOPE,
|
|
"access_type": "offline",
|
|
"prompt": "consent",
|
|
"state": state,
|
|
"login_hint": account_email,
|
|
}
|
|
url = "https://accounts.google.com/o/oauth2/v2/auth?" + urllib.parse.urlencode(params)
|
|
handler.send_json({"redirect_url": url})
|
|
|
|
|
|
def _h_oauth_callback(handler):
|
|
"""Exchange code for tokens, encrypt refresh token, store."""
|
|
q = handler.get_query_params()
|
|
code = q.get("code")
|
|
state = q.get("state")
|
|
if not code or not state:
|
|
return handler.send_error_json("code and state required", 400)
|
|
|
|
state_row = _oauth_state_consume(state)
|
|
if not state_row:
|
|
return handler.send_error_json("Invalid state", 400)
|
|
|
|
import urllib.parse
|
|
import urllib.request
|
|
body = urllib.parse.urlencode({
|
|
"code": code,
|
|
"client_id": _cfg.CONFIG.oauth_client_id,
|
|
"client_secret": _cfg.CONFIG.oauth_client_secret,
|
|
"redirect_uri": _cfg.CONFIG.oauth_redirect_uri,
|
|
"grant_type": "authorization_code",
|
|
}).encode("ascii")
|
|
req = urllib.request.Request(
|
|
"https://oauth2.googleapis.com/token",
|
|
data=body,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
payload = json.loads(resp.read())
|
|
except Exception as e:
|
|
return handler.send_error_json(f"Token exchange failed: {e}", 500)
|
|
|
|
refresh = payload.get("refresh_token")
|
|
if not refresh:
|
|
return handler.send_error_json("No refresh_token returned (user may have previously consented; prompt=consent required)", 400)
|
|
|
|
enc = _crypto.encrypt(refresh.encode("ascii"), secret_key_b64=_cfg.CONFIG.secret_key_b64)
|
|
|
|
conn = _conn()
|
|
try:
|
|
_db.upsert_account(conn, user_id=state_row["user_id"],
|
|
email_address=state_row["account_email"],
|
|
auth_method="oauth")
|
|
conn.execute(
|
|
"UPDATE email_accounts SET oauth_refresh_enc = ?, sync_status = 'pending', "
|
|
"updated_at = datetime('now') WHERE email_address = ?",
|
|
(enc, state_row["account_email"]),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
handler.send_json({"ok": True, "account_email": state_row["account_email"]})
|
|
|
|
|
|
# ---------------------------------------------------------------------------- POST handlers
|
|
|
|
def _h_enroll_all(handler):
|
|
"""Admin: enroll every CRM user whose email is @workspace_domain via DWD."""
|
|
user = _require_admin(handler)
|
|
if not user:
|
|
return
|
|
if _cfg.CONFIG.primary_auth != "dwd":
|
|
return handler.send_error_json("enroll-all only valid in DWD mode", 400)
|
|
domain = _cfg.CONFIG.workspace_domain
|
|
if not domain:
|
|
return handler.send_error_json("CRM_GMAIL_WORKSPACE_DOMAIN not set", 400)
|
|
|
|
conn = _conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id, email FROM users WHERE is_active = 1 AND email LIKE ?",
|
|
(f"%@{domain}",),
|
|
)
|
|
users = cur.fetchall()
|
|
created = []
|
|
for u in users:
|
|
aid = _db.upsert_account(conn, user_id=u["id"],
|
|
email_address=u["email"].lower(),
|
|
auth_method="dwd")
|
|
created.append({"account_id": aid, "email": u["email"]})
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
handler.send_json({"enrolled": created, "count": len(created)})
|
|
|
|
|
|
def _h_enroll_one(handler):
|
|
user = _require_admin(handler)
|
|
if not user:
|
|
return
|
|
body = handler.get_body() or {}
|
|
# Accept either `email` or `email_address` for ergonomics.
|
|
email_address = (body.get("email_address") or body.get("email") or "").lower().strip()
|
|
user_id = body.get("user_id")
|
|
auth_method = body.get("auth_method") or _cfg.CONFIG.primary_auth
|
|
|
|
if not email_address:
|
|
return handler.send_error_json("email (or email_address) required", 400)
|
|
|
|
# If the caller didn't specify a CRM user_id, resolve it from the
|
|
# users table by matching email. Falls back to the authenticated
|
|
# admin's own id (handles the common case of a single admin
|
|
# enrolling themselves without having to paste their UUID).
|
|
if not user_id:
|
|
conn = _conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM users WHERE LOWER(email) = ?",
|
|
(email_address,))
|
|
row = cur.fetchone()
|
|
user_id = row["id"] if row else user.get("id")
|
|
finally:
|
|
conn.close()
|
|
|
|
if not user_id:
|
|
return handler.send_error_json("could not resolve user_id for that email", 400)
|
|
|
|
conn = _conn()
|
|
try:
|
|
aid = _db.upsert_account(conn, user_id=user_id,
|
|
email_address=email_address,
|
|
auth_method=auth_method)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
handler.send_json({"account_id": aid, "email": email_address, "user_id": user_id})
|
|
|
|
|
|
def _h_run_now(handler):
|
|
user = _require_admin(handler)
|
|
if not user:
|
|
return
|
|
# Reuse existing rate limit so admins can't hammer this.
|
|
if handler.rate_limited("email-sync-now", 6):
|
|
return handler.send_error_json("Too many requests", 429)
|
|
# A manual sync is an explicit retry. The scheduler intentionally skips
|
|
# error-status accounts (no retry storms), so clear that status here so a
|
|
# mailbox that previously errored is re-attempted. Backfill resumes from its
|
|
# saved cursor and dedups by Message-ID, so nothing is re-captured twice.
|
|
conn = _conn()
|
|
try:
|
|
conn.execute("UPDATE email_accounts SET sync_status = 'pending', sync_error = NULL "
|
|
"WHERE sync_enabled = 1 AND sync_status = 'error'")
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
result = _sched.trigger_run_now()
|
|
handler.send_json(result)
|
|
|
|
|
|
def _h_rematch(handler):
|
|
"""Re-evaluate unmatched emails against the current investor index."""
|
|
user = _require_admin(handler)
|
|
if not user:
|
|
return
|
|
body = handler.get_body() or {}
|
|
since = body.get("since") # optional ISO8601
|
|
conn = _conn()
|
|
scanned = 0
|
|
matched = 0
|
|
try:
|
|
from .matcher import InvestorIndex
|
|
index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain)
|
|
index.rebuild(_conn)
|
|
cur = conn.cursor()
|
|
sql = ("SELECT id, from_email, to_emails_json, cc_emails_json "
|
|
"FROM emails WHERE match_status = 'unmatched'")
|
|
params: list = []
|
|
if since:
|
|
sql += " AND sent_at >= ?"
|
|
params.append(since)
|
|
sql += " ORDER BY sent_at DESC LIMIT 10000"
|
|
cur.execute(sql, params)
|
|
for row in cur.fetchall():
|
|
scanned += 1
|
|
participants = set()
|
|
if row["from_email"]:
|
|
participants.add(row["from_email"].lower())
|
|
for col in ("to_emails_json", "cc_emails_json"):
|
|
try:
|
|
arr = json.loads(row[col] or "[]")
|
|
except Exception:
|
|
arr = []
|
|
for a in arr:
|
|
e = a.get("email") if isinstance(a, dict) else a
|
|
if e:
|
|
participants.add(e.lower())
|
|
links = index.match(participants)
|
|
if not links:
|
|
continue
|
|
matched += 1
|
|
conn.execute(
|
|
"UPDATE emails SET match_status='matched', is_matched=1, "
|
|
"updated_at=datetime('now') WHERE id=?",
|
|
(row["id"],),
|
|
)
|
|
for link in links:
|
|
_db.insert_investor_link(conn, email_id=row["id"], link={
|
|
"matched_address": link.matched_address,
|
|
"match_kind": link.match_kind,
|
|
"match_confidence": link.match_confidence,
|
|
"fundraising_investor_id": link.target.fundraising_investor_id,
|
|
"fundraising_contact_id": link.target.fundraising_contact_id,
|
|
"contact_id": link.target.contact_id,
|
|
"organization_id": link.target.organization_id,
|
|
})
|
|
# NOTE: body is still missing — we only have headers. A follow-up
|
|
# job can re-fetch the full message from Gmail using the sighting's
|
|
# gmail_message_id. Not done inline to keep this endpoint fast.
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
handler.send_json({"scanned": scanned, "newly_matched": matched})
|
|
|
|
|
|
# ---------------------------------------------------------------------------- OAuth state store (in-memory)
|
|
# For a 5-person CRM the state store doesn't need to be durable — a server
|
|
# restart between start and callback is rare and just requires a retry.
|
|
|
|
_oauth_states: dict[str, dict] = {}
|
|
_oauth_state_lock = __import__("threading").Lock()
|
|
|
|
|
|
def _oauth_state_store(state: str, user_id: str, account_email: str) -> None:
|
|
import time
|
|
with _oauth_state_lock:
|
|
# Prune stale entries (>10 min).
|
|
cutoff = time.time() - 600
|
|
for k, v in list(_oauth_states.items()):
|
|
if v["created"] < cutoff:
|
|
_oauth_states.pop(k, None)
|
|
_oauth_states[state] = {
|
|
"user_id": user_id,
|
|
"account_email": account_email.lower().strip(),
|
|
"created": time.time(),
|
|
}
|
|
|
|
|
|
def _oauth_state_consume(state: str) -> Optional[dict]:
|
|
with _oauth_state_lock:
|
|
return _oauth_states.pop(state, None)
|