""" Sync orchestrator. Top-level entry points: sync_account(conn_factory, credential_provider, account_row, matcher) Full sync pass for one mailbox. Decides backfill vs. incremental based on email_accounts.backfill_complete. Writes a sync_runs row. sync_all(conn_factory, credential_provider, matcher) Iterates every sync-enabled account sequentially. Called from scheduler.py every CRM_GMAIL_SYNC_INTERVAL_MIN minutes. Design: match-only storage (see architecture doc ยง7). For each message: 1. Fetch metadata (cheap, 5 units). 2. Run matcher against participant addresses. 3. If matched โ†’ fetch full message, parse, persist body + register attachments. 4. If unmatched โ†’ persist header-only row. 5. In both cases, record the per-account sighting. """ import logging import sqlite3 import traceback from typing import Optional from . import attachments as _attach from . import config as _cfg from . import db as _db from . import errors as _errors from . import gmail_client as _gmail from . import parser as _parser from . import threads as _threads from .matcher import InvestorIndex, InvestorLink log = logging.getLogger("email_integration.sync") METADATA_HEADERS = [ "From", "To", "Cc", "Bcc", "Subject", "Date", "Message-ID", "In-Reply-To", "References", "Reply-To", ] # ---------------------------------------------------------------------------- public def sync_all(conn_factory, credential_provider, index: InvestorIndex) -> dict: """Run one pass across all enabled accounts. Returns summary stats.""" index.rebuild_if_stale(conn_factory) conn = conn_factory() try: accounts = _db.list_sync_ready_accounts(conn) finally: conn.close() totals = {"accounts": 0, "messages_stored": 0, "errors": 0} for acc in accounts: totals["accounts"] += 1 try: stats = sync_account(conn_factory, credential_provider, acc, index) totals["messages_stored"] += stats.get("messages_stored", 0) except Exception: totals["errors"] += 1 log.exception("sync failed for account %s", acc["email_address"]) return totals def sync_account(conn_factory, credential_provider, account, index: InvestorIndex) -> dict: """Sync a single mailbox. Returns stats dict.""" email_addr = account["email_address"] stats = _gmail.CallStats() client = _gmail.GmailClient(credential_provider, email_addr, stats=stats) # Mark running conn = conn_factory() try: run_id = _db.start_sync_run(conn, account_id=account["id"], kind="backfill" if not account["backfill_complete"] else "incremental") _db.set_account_status(conn, account["id"], status="active", error=None) conn.commit() finally: conn.close() run_stats = {"messages_seen": 0, "messages_stored": 0, "attachments_saved": 0} error_str: Optional[str] = None status = "ok" try: if not account["backfill_complete"]: _run_backfill(conn_factory, client, account, index, run_stats) else: _run_incremental(conn_factory, client, account, index, run_stats) # Drain attachments for this account. conn = conn_factory() try: # Limit to a few cycles' worth of attachments per pass. batched = _attach.drain_pending(conn_factory, client, account["id"], limit=100) run_stats["attachments_saved"] = batched finally: conn.close() except _errors.AuthError as e: error_str = f"auth: {e}" status = "error" except _errors.HistoryExpiredError: # Recover: reset to date-based backfill from last_synced_at. error_str = "history expired; fallback to date backfill" status = "partial" _fallback_date_backfill(conn_factory, client, account, index, run_stats) except Exception as e: error_str = f"unexpected: {type(e).__name__}: {e}" status = "error" log.exception("unexpected during sync of %s", email_addr) finally: run_stats["api_calls"] = stats.api_calls run_stats["retries"] = stats.retries conn = conn_factory() try: _db.finish_sync_run(conn, run_id, status=status, stats=run_stats, error=error_str) _db.set_account_status(conn, account["id"], status="active" if status == "ok" else status, error=error_str) _db.set_account_checkpoint(conn, account["id"], last_synced_at=_db._now_iso()) conn.commit() finally: conn.close() return run_stats # ---------------------------------------------------------------------------- backfill def _run_backfill(conn_factory, client, account, index: InvestorIndex, run_stats: dict) -> None: """Initial full-mailbox backfill, resumable via backfill_cursor.""" page_token = account["backfill_cursor"] while True: resp = client.list_messages(page_token=page_token, max_results=_cfg.CONFIG.backfill_page_size) messages = resp.get("messages") or [] for m in messages: run_stats["messages_seen"] += 1 try: _process_one_message(conn_factory, client, account, index, gmail_message_id=m["id"], run_stats=run_stats) except _errors.GmailError as e: log.warning("skip msg %s on %s: %s", m["id"], account["email_address"], e) continue page_token = resp.get("nextPageToken") conn = conn_factory() try: _db.set_account_checkpoint(conn, account["id"], backfill_cursor=page_token, backfill_complete=(not page_token)) conn.commit() finally: conn.close() if not page_token: # Capture current historyId as checkpoint for future incrementals. prof = client.get_profile() hid = prof.get("historyId") if hid: conn = conn_factory() try: _db.set_account_checkpoint(conn, account["id"], history_id=str(hid)) conn.commit() finally: conn.close() return # ---------------------------------------------------------------------------- incremental def _run_incremental(conn_factory, client, account, index: InvestorIndex, run_stats: dict) -> None: start_hid = account["last_history_id"] if not start_hid: # Safety: if checkpoint is missing, re-enter backfill. _run_backfill(conn_factory, client, account, index, run_stats) return # history_types filter limits bandwidth to what we care about. new_hid: Optional[str] = None try: for h in client.iter_history( start_history_id=start_hid, history_types=["messageAdded", "messageDeleted", "labelAdded", "labelRemoved"], ): for ma in h.get("messagesAdded") or []: msg = ma.get("message") or {} run_stats["messages_seen"] += 1 try: _process_one_message(conn_factory, client, account, index, gmail_message_id=msg.get("id"), run_stats=run_stats) except _errors.GmailError as e: log.warning("skip msg %s on %s: %s", msg.get("id"), account["email_address"], e) for md in h.get("messagesDeleted") or []: msg = md.get("message") or {} conn = conn_factory() try: _db.tombstone_sighting( conn, account_id=account["id"], gmail_message_id=msg.get("id"), ) conn.commit() finally: conn.close() for la in (h.get("labelsAdded") or []) + (h.get("labelsRemoved") or []): msg = la.get("message") or {} # labels are the resulting label set in Gmail's payload after # the change. We refresh them wholesale. labels = msg.get("labelIds") or [] conn = conn_factory() try: _db.update_sighting_labels( conn, account_id=account["id"], gmail_message_id=msg.get("id"), labels=labels, ) conn.commit() finally: conn.close() new_hid = client.last_history_id except _errors.HistoryExpiredError: raise if new_hid: conn = conn_factory() try: _db.set_account_checkpoint(conn, account["id"], history_id=str(new_hid)) conn.commit() finally: conn.close() def _fallback_date_backfill(conn_factory, client, account, index, run_stats): """Used when startHistoryId has been pruned by Gmail. Pulls everything since last_synced_at (or 14d if unknown), which will hit a large overlap with existing data but upserts are idempotent. """ from datetime import datetime, timedelta, timezone since = account["last_synced_at"] or ( datetime.now(tz=timezone.utc) - timedelta(days=14) ).strftime("%Y-%m-%dT%H:%M:%SZ") q = f"after:{since.replace('-', '/').split('T')[0]}" for m in client.iter_messages(q=q): run_stats["messages_seen"] += 1 try: _process_one_message(conn_factory, client, account, index, gmail_message_id=m["id"], run_stats=run_stats) except _errors.GmailError as e: log.warning("skip during date-backfill msg %s: %s", m["id"], e) prof = client.get_profile() hid = prof.get("historyId") if hid: conn = conn_factory() try: _db.set_account_checkpoint(conn, account["id"], history_id=str(hid)) conn.commit() finally: conn.close() # ---------------------------------------------------------------------------- per-message def _process_one_message(conn_factory, client, account, index: InvestorIndex, *, gmail_message_id: str, run_stats: dict) -> None: """Fetch, match, persist one message. Idempotent.""" if not gmail_message_id: return # Skip if we've already sighted this message for this account. conn = conn_factory() try: cur = conn.cursor() cur.execute( "SELECT email_id FROM email_account_messages " "WHERE account_id = ? AND gmail_message_id = ?", (account["id"], gmail_message_id), ) if cur.fetchone(): return finally: conn.close() # 1. Metadata fetch (cheap). meta = client.get_message(gmail_message_id, format="metadata", metadata_headers=METADATA_HEADERS) meta_parsed = _parser.parse(meta, owning_account_address=account["email_address"]) participants = set() if meta_parsed.get("from_email"): participants.add(meta_parsed["from_email"]) for kind in ("to", "cc", "bcc"): for a in meta_parsed.get(kind, []): if isinstance(a, dict) and a.get("email"): participants.add(a["email"]) # Exclude owning account's own address so we don't try to "match" ourselves. own = {account["email_address"].lower()} links = index.match(participants, exclude_addresses=own) is_matched = bool(links) # 2. If matched, fetch full and parse for body + attachments. if is_matched: full = client.get_message(gmail_message_id, format="full") parsed = _parser.parse(full, owning_account_address=account["email_address"]) else: parsed = meta_parsed # Strip any body fields (metadata fetch shouldn't have them but be safe). parsed["body_text"] = None parsed["body_html"] = None parsed["attachments"] = [] # 3. Persist (idempotent on rfc_message_id). conn = conn_factory() try: existing = _db.find_email_by_rfc_id(conn, parsed["rfc_message_id"]) if existing: email_id = existing["id"] # If the email was previously unmatched but now matches (e.g. user # added the investor after first sight), upgrade the row. if is_matched and existing["match_status"] == "unmatched": conn.execute( "UPDATE emails SET match_status = 'matched', is_matched = 1, " "body_text = ?, body_html = ?, updated_at = datetime('now') " "WHERE id = ?", (parsed.get("body_text"), parsed.get("body_html"), email_id), ) _attach.register_stubs(conn, email_id=email_id, parsed_attachments=parsed.get("attachments") or []) for link in links: _db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link)) else: match_status = "matched" if is_matched else "unmatched" email_id = _db.insert_email(conn, parsed=parsed, match_status=match_status) thread_id = _threads.resolve_thread_id(conn, parsed) _db.set_email_thread(conn, email_id, thread_id) if is_matched: _attach.register_stubs(conn, email_id=email_id, parsed_attachments=parsed.get("attachments") or []) for link in links: _db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link)) _db.rollup_thread(conn, thread_id) run_stats["messages_stored"] += 1 # Record sighting (always, even if email row was pre-existing). _db.upsert_sighting( conn, email_id=email_id, account_id=account["id"], gmail_message_id=gmail_message_id, gmail_thread_id=parsed.get("gmail_thread_id") or "", labels=parsed.get("labels", []), is_sent=parsed.get("is_sent", False), ) conn.commit() except sqlite3.IntegrityError: # Concurrent insert race โ€” re-read and proceed. pass finally: conn.close() def _flatten_link(link: InvestorLink) -> dict: return { "matched_address": link.matched_address, "match_kind": link.match_kind, "match_confidence": link.match_confidence, "fundraising_investor_id": link.target.fundraising_investor_id, "fundraising_contact_id": link.target.fundraising_contact_id, "contact_id": link.target.contact_id, "organization_id": link.target.organization_id, }