""" HTTP route handlers for the Gmail integration. Designed to plug into server.py's CRMHandler (BaseHTTPRequestHandler) pattern. The hook is a single function call near the top of do_GET / do_POST that lets this module claim any /api/email/* request: # in CRMHandler.do_GET and CRMHandler.do_POST, before the 404 fallthrough: from email_integration.routes import try_handle if try_handle(self): return `try_handle(handler)` inspects `handler.command` and `handler.get_path()` and returns True if it handled the request (sent a response). Every handler respects the same auth / rate-limit model as the rest of server.py by calling handler.get_user() and handler.rate_limited(...). """ import json import sqlite3 from typing import Optional from . import config as _cfg from . import credentials as _creds from . import crypto as _crypto from . import db as _db from . import scheduler as _sched # ---------------------------------------------------------------------------- dispatch _GET_ROUTES = { "/api/email/status": "status", "/api/email/accounts": "list_accounts", "/api/email/threads": "list_threads", "/api/email/oauth/start": "oauth_start", "/api/email/oauth/callback": "oauth_callback", } _POST_ROUTES = { "/api/email/accounts/enroll-all": "enroll_all", "/api/email/accounts/enroll": "enroll_one", "/api/email/sync/run-now": "run_now", "/api/email/rematch": "rematch", } def try_handle(handler) -> bool: path = handler.get_path() method = handler.command table = _GET_ROUTES if method == "GET" else _POST_ROUTES if method == "POST" else {} name = table.get(path) if not path.startswith("/api/email/"): return False if not name: # Route is owned by this module but unknown — return a proper 404 # instead of letting the main dispatcher's 404 abuse counter fire. handler.send_error_json("Not found", 404) return True if not _cfg.CONFIG.enabled: handler.send_error_json("Email integration disabled", 503) return True # Also enforce attachment streaming under a different prefix # (handled above via prefix check). impl = globals().get(f"_h_{name}") if impl is None: handler.send_error_json("Not implemented", 500) return True try: impl(handler) except Exception as e: handler.send_error_json(f"Internal error: {e}", 500) return True # ---------------------------------------------------------------------------- helpers def _conn() -> sqlite3.Connection: import os db_path = os.environ.get( "CRM_DB_PATH", os.path.join(_cfg.CONFIG.data_dir, "crm.db"), ) conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=5000") conn.row_factory = sqlite3.Row return conn def _require_auth(handler) -> Optional[dict]: user = handler.get_user() if not user: handler.send_error_json("Unauthorized", 401) return None return user def _require_admin(handler) -> Optional[dict]: user = _require_auth(handler) if user is None: return None if user.get("role") != "admin": handler.send_error_json("Admin required", 403) return None return user # ---------------------------------------------------------------------------- GET handlers def _h_status(handler): user = _require_auth(handler) if not user: return snap = _sched.status_snapshot() conn = _conn() try: cur = conn.cursor() cur.execute( "SELECT COUNT(*) AS n_accounts, " "SUM(CASE WHEN sync_status='active' THEN 1 ELSE 0 END) AS n_active, " "SUM(CASE WHEN sync_status='error' THEN 1 ELSE 0 END) AS n_error " "FROM email_accounts" ) counts = dict(cur.fetchone() or {}) cur.execute("SELECT COUNT(*) AS n FROM emails WHERE match_status = 'matched'") snap["matched_emails"] = cur.fetchone()["n"] finally: conn.close() snap["accounts_summary"] = counts handler.send_json(snap) def _h_list_accounts(handler): user = _require_auth(handler) if not user: return conn = _conn() try: cur = conn.cursor() cur.execute( "SELECT id, user_id, email_address, auth_method, sync_enabled, " "sync_status, sync_error, last_synced_at, backfill_complete " "FROM email_accounts ORDER BY email_address" ) rows = [dict(r) for r in cur.fetchall()] finally: conn.close() # Non-admins only see their own row if user.get("role") != "admin": rows = [r for r in rows if r["user_id"] == user["user_id"]] handler.send_json({"accounts": rows}) def _h_list_threads(handler): user = _require_auth(handler) if not user: return q = handler.get_query_params() investor_id = q.get("investor_id") limit = min(int(q.get("limit", 50)), 500) conn = _conn() try: cur = conn.cursor() if investor_id: cur.execute( """SELECT t.* FROM email_threads t JOIN emails e ON e.thread_id = t.id JOIN email_investor_links l ON l.email_id = e.id WHERE l.fundraising_investor_id = ? OR l.fundraising_contact_id IN ( SELECT id FROM fundraising_contacts WHERE investor_id = ? ) GROUP BY t.id ORDER BY t.last_message_at DESC LIMIT ?""", (investor_id, investor_id, limit), ) else: cur.execute( "SELECT * FROM email_threads WHERE is_matched = 1 " "ORDER BY last_message_at DESC LIMIT ?", (limit,), ) threads = [dict(r) for r in cur.fetchall()] finally: conn.close() handler.send_json({"threads": threads}) def _h_oauth_start(handler): """Begin per-user OAuth consent flow (fallback path).""" user = _require_auth(handler) if not user: return if _cfg.CONFIG.primary_auth != "oauth": return handler.send_error_json( "Per-user OAuth disabled (set CRM_GMAIL_AUTH_METHOD=oauth to enable)", 400 ) q = handler.get_query_params() account_email = q.get("account_email") or "" if not account_email: return handler.send_error_json("account_email required", 400) import secrets import urllib.parse state = secrets.token_urlsafe(32) _oauth_state_store(state, user["user_id"], account_email) params = { "client_id": _cfg.CONFIG.oauth_client_id, "redirect_uri": _cfg.CONFIG.oauth_redirect_uri, "response_type": "code", "scope": _creds.GMAIL_READONLY_SCOPE, "access_type": "offline", "prompt": "consent", "state": state, "login_hint": account_email, } url = "https://accounts.google.com/o/oauth2/v2/auth?" + urllib.parse.urlencode(params) handler.send_json({"redirect_url": url}) def _h_oauth_callback(handler): """Exchange code for tokens, encrypt refresh token, store.""" q = handler.get_query_params() code = q.get("code") state = q.get("state") if not code or not state: return handler.send_error_json("code and state required", 400) state_row = _oauth_state_consume(state) if not state_row: return handler.send_error_json("Invalid state", 400) import urllib.parse import urllib.request body = urllib.parse.urlencode({ "code": code, "client_id": _cfg.CONFIG.oauth_client_id, "client_secret": _cfg.CONFIG.oauth_client_secret, "redirect_uri": _cfg.CONFIG.oauth_redirect_uri, "grant_type": "authorization_code", }).encode("ascii") req = urllib.request.Request( "https://oauth2.googleapis.com/token", data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) try: with urllib.request.urlopen(req, timeout=15) as resp: payload = json.loads(resp.read()) except Exception as e: return handler.send_error_json(f"Token exchange failed: {e}", 500) refresh = payload.get("refresh_token") if not refresh: return handler.send_error_json("No refresh_token returned (user may have previously consented; prompt=consent required)", 400) enc = _crypto.encrypt(refresh.encode("ascii"), secret_key_b64=_cfg.CONFIG.secret_key_b64) conn = _conn() try: _db.upsert_account(conn, user_id=state_row["user_id"], email_address=state_row["account_email"], auth_method="oauth") conn.execute( "UPDATE email_accounts SET oauth_refresh_enc = ?, sync_status = 'pending', " "updated_at = datetime('now') WHERE email_address = ?", (enc, state_row["account_email"]), ) conn.commit() finally: conn.close() handler.send_json({"ok": True, "account_email": state_row["account_email"]}) # ---------------------------------------------------------------------------- POST handlers def _h_enroll_all(handler): """Admin: enroll every CRM user whose email is @workspace_domain via DWD.""" user = _require_admin(handler) if not user: return if _cfg.CONFIG.primary_auth != "dwd": return handler.send_error_json("enroll-all only valid in DWD mode", 400) domain = _cfg.CONFIG.workspace_domain if not domain: return handler.send_error_json("CRM_GMAIL_WORKSPACE_DOMAIN not set", 400) conn = _conn() try: cur = conn.cursor() cur.execute( "SELECT id, email FROM users WHERE is_active = 1 AND email LIKE ?", (f"%@{domain}",), ) users = cur.fetchall() created = [] for u in users: aid = _db.upsert_account(conn, user_id=u["id"], email_address=u["email"].lower(), auth_method="dwd") created.append({"account_id": aid, "email": u["email"]}) conn.commit() finally: conn.close() handler.send_json({"enrolled": created, "count": len(created)}) def _h_enroll_one(handler): user = _require_admin(handler) if not user: return body = handler.get_body() or {} # Accept either `email` or `email_address` for ergonomics. email_address = (body.get("email_address") or body.get("email") or "").lower().strip() user_id = body.get("user_id") auth_method = body.get("auth_method") or _cfg.CONFIG.primary_auth if not email_address: return handler.send_error_json("email (or email_address) required", 400) # If the caller didn't specify a CRM user_id, resolve it from the # users table by matching email. Falls back to the authenticated # admin's own id (handles the common case of a single admin # enrolling themselves without having to paste their UUID). if not user_id: conn = _conn() try: cur = conn.cursor() cur.execute("SELECT id FROM users WHERE LOWER(email) = ?", (email_address,)) row = cur.fetchone() user_id = row["id"] if row else user.get("id") finally: conn.close() if not user_id: return handler.send_error_json("could not resolve user_id for that email", 400) conn = _conn() try: aid = _db.upsert_account(conn, user_id=user_id, email_address=email_address, auth_method=auth_method) conn.commit() finally: conn.close() handler.send_json({"account_id": aid, "email": email_address, "user_id": user_id}) def _h_run_now(handler): user = _require_admin(handler) if not user: return # Reuse existing rate limit so admins can't hammer this. if handler.rate_limited("email-sync-now", 6): return handler.send_error_json("Too many requests", 429) result = _sched.trigger_run_now() handler.send_json(result) def _h_rematch(handler): """Re-evaluate unmatched emails against the current investor index.""" user = _require_admin(handler) if not user: return body = handler.get_body() or {} since = body.get("since") # optional ISO8601 conn = _conn() scanned = 0 matched = 0 try: from .matcher import InvestorIndex index = InvestorIndex(own_domain=_cfg.CONFIG.workspace_domain) index.rebuild(_conn) cur = conn.cursor() sql = ("SELECT id, from_email, to_emails_json, cc_emails_json " "FROM emails WHERE match_status = 'unmatched'") params: list = [] if since: sql += " AND sent_at >= ?" params.append(since) sql += " ORDER BY sent_at DESC LIMIT 10000" cur.execute(sql, params) for row in cur.fetchall(): scanned += 1 participants = set() if row["from_email"]: participants.add(row["from_email"].lower()) for col in ("to_emails_json", "cc_emails_json"): try: arr = json.loads(row[col] or "[]") except Exception: arr = [] for a in arr: e = a.get("email") if isinstance(a, dict) else a if e: participants.add(e.lower()) links = index.match(participants) if not links: continue matched += 1 conn.execute( "UPDATE emails SET match_status='matched', is_matched=1, " "updated_at=datetime('now') WHERE id=?", (row["id"],), ) for link in links: _db.insert_investor_link(conn, email_id=row["id"], link={ "matched_address": link.matched_address, "match_kind": link.match_kind, "match_confidence": link.match_confidence, "fundraising_investor_id": link.target.fundraising_investor_id, "fundraising_contact_id": link.target.fundraising_contact_id, "contact_id": link.target.contact_id, "organization_id": link.target.organization_id, }) # NOTE: body is still missing — we only have headers. A follow-up # job can re-fetch the full message from Gmail using the sighting's # gmail_message_id. Not done inline to keep this endpoint fast. conn.commit() finally: conn.close() handler.send_json({"scanned": scanned, "newly_matched": matched}) # ---------------------------------------------------------------------------- OAuth state store (in-memory) # For a 5-person CRM the state store doesn't need to be durable — a server # restart between start and callback is rare and just requires a retry. _oauth_states: dict[str, dict] = {} _oauth_state_lock = __import__("threading").Lock() def _oauth_state_store(state: str, user_id: str, account_email: str) -> None: import time with _oauth_state_lock: # Prune stale entries (>10 min). cutoff = time.time() - 600 for k, v in list(_oauth_states.items()): if v["created"] < cutoff: _oauth_states.pop(k, None) _oauth_states[state] = { "user_id": user_id, "account_email": account_email.lower().strip(), "created": time.time(), } def _oauth_state_consume(state: str) -> Optional[dict]: with _oauth_state_lock: return _oauth_states.pop(state, None)