Files
ten31-database/backend/email_integration/sync.py
T
Keysat 1564c087bf Remove Instructions/Feedback + lp_profiles; sync retry, purge, mobile fixes (v0.1.0:104)
Removals (net -570 lines):
- Delete the Instructions and Feedback (feature_requests) pages + backend.
- Retire lp_profiles + investor_type across server, ingest, and seeds; migration
  0008 drops both empty tables (a sanctioned one-off exception to
  never-hard-delete). 0001's lp_profiles ALTER is removed so a fresh DB doesn't
  break the migration chain (live DBs already applied it).

Fixes:
- Email sync: a transient timeout no longer terminally parks a mailbox; the
  scheduler retries 'retrying' each cycle and re-includes errored accounts on an
  hourly backoff, so stuck mailboxes self-heal.
- Mobile Contacts: page through the full directory (server caps 500/page) -- one
  fetch silently truncated at 720, hiding people from the list and from search.
- Mobile email review: clock icon to set a reminder inline; approval cards show
  date/time.

New:
- Admin-only purge of soft-deleted rows (Settings -> Admin; type-to-confirm,
  refuses any row still linked to live data).

Tests: 45/45 (adds test_sync_ready + test_purge_soft_deleted). Reviewer pass
applied (NULL reminders.contact_id on contact purge). Bumped to v0.1.0:104.
2026-06-20 20:06:11 -05:00

401 lines
16 KiB
Python

"""
Sync orchestrator.
Top-level entry points:
sync_account(conn_factory, credential_provider, account_row, matcher)
Full sync pass for one mailbox. Decides backfill vs. incremental based
on email_accounts.backfill_complete. Writes a sync_runs row.
sync_all(conn_factory, credential_provider, matcher)
Iterates every sync-enabled account sequentially. Called from
scheduler.py every CRM_GMAIL_SYNC_INTERVAL_MIN minutes.
Design: match-only storage (see architecture doc §7). For each message:
1. Fetch metadata (cheap, 5 units).
2. Run matcher against participant addresses.
3. If matched → fetch full message, parse, persist body + register attachments.
4. If unmatched → persist header-only row.
5. In both cases, record the per-account sighting.
"""
import logging
import sqlite3
import traceback
from typing import Optional
from urllib.error import URLError
from . import attachments as _attach
from . import config as _cfg
from . import db as _db
from . import errors as _errors
from . import gmail_client as _gmail
from . import parser as _parser
from . import threads as _threads
from .matcher import InvestorIndex, InvestorLink
log = logging.getLogger("email_integration.sync")
METADATA_HEADERS = [
"From", "To", "Cc", "Bcc", "Subject", "Date",
"Message-ID", "In-Reply-To", "References", "Reply-To",
]
# ---------------------------------------------------------------------------- public
def sync_all(conn_factory, credential_provider, index: InvestorIndex) -> dict:
"""Run one pass across all enabled accounts. Returns summary stats."""
index.rebuild_if_stale(conn_factory)
conn = conn_factory()
try:
accounts = _db.list_sync_ready_accounts(conn)
finally:
conn.close()
totals = {"accounts": 0, "messages_stored": 0, "errors": 0}
for acc in accounts:
totals["accounts"] += 1
try:
stats = sync_account(conn_factory, credential_provider, acc, index)
totals["messages_stored"] += stats.get("messages_stored", 0)
except Exception:
totals["errors"] += 1
log.exception("sync failed for account %s", acc["email_address"])
return totals
def sync_account(conn_factory, credential_provider, account,
index: InvestorIndex) -> dict:
"""Sync a single mailbox. Returns stats dict."""
email_addr = account["email_address"]
stats = _gmail.CallStats()
client = _gmail.GmailClient(credential_provider, email_addr, stats=stats)
# Mark running
conn = conn_factory()
try:
run_id = _db.start_sync_run(conn,
account_id=account["id"],
kind="backfill" if not account["backfill_complete"] else "incremental")
_db.set_account_status(conn, account["id"], status="active", error=None)
conn.commit()
finally:
conn.close()
run_stats = {"messages_seen": 0, "messages_stored": 0, "attachments_saved": 0}
error_str: Optional[str] = None
status = "ok"
try:
if not account["backfill_complete"]:
_run_backfill(conn_factory, client, account, index, run_stats)
else:
_run_incremental(conn_factory, client, account, index, run_stats)
# Drain attachments for this account.
conn = conn_factory()
try:
# Limit to a few cycles' worth of attachments per pass.
batched = _attach.drain_pending(conn_factory, client, account["id"], limit=100)
run_stats["attachments_saved"] = batched
finally:
conn.close()
except _errors.AuthError as e:
error_str = f"auth: {e}"
status = "error"
except _errors.HistoryExpiredError:
# Recover: reset to date-based backfill from last_synced_at.
error_str = "history expired; fallback to date backfill"
status = "partial"
_fallback_date_backfill(conn_factory, client, account, index, run_stats)
except (_errors.RateLimitError, _errors.TransientError, URLError, TimeoutError) as e:
# A network / 5xx / rate-limit error that outlived the in-pass retry loop.
# This is TRANSIENT, not terminal: park it as 'retrying' (which the scheduler
# still picks up every cycle) instead of 'error' (which it excludes). Fixes the
# v<=0.1.0:103 bug where a single timeout dark-listed a mailbox until a manual
# kick. Terminal causes (auth, permanent, unexpected) still fall through to 'error'.
error_str = f"transient: {type(e).__name__}: {e}"
status = "retrying"
log.warning("transient error during sync of %s: %s", email_addr, e)
except Exception as e:
error_str = f"unexpected: {type(e).__name__}: {e}"
status = "error"
log.exception("unexpected during sync of %s", email_addr)
finally:
run_stats["api_calls"] = stats.api_calls
run_stats["retries"] = stats.retries
conn = conn_factory()
try:
_db.finish_sync_run(conn, run_id, status=status, stats=run_stats, error=error_str)
_db.set_account_status(conn, account["id"],
status="active" if status == "ok" else status,
error=error_str)
_db.set_account_checkpoint(conn, account["id"],
last_synced_at=_db._now_iso())
conn.commit()
finally:
conn.close()
return run_stats
# ---------------------------------------------------------------------------- backfill
def _run_backfill(conn_factory, client, account, index: InvestorIndex,
run_stats: dict) -> None:
"""Initial full-mailbox backfill, resumable via backfill_cursor."""
page_token = account["backfill_cursor"]
while True:
resp = client.list_messages(page_token=page_token,
max_results=_cfg.CONFIG.backfill_page_size)
messages = resp.get("messages") or []
for m in messages:
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=m["id"], run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip msg %s on %s: %s", m["id"], account["email_address"], e)
continue
page_token = resp.get("nextPageToken")
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"],
backfill_cursor=page_token,
backfill_complete=(not page_token))
conn.commit()
finally:
conn.close()
if not page_token:
# Capture current historyId as checkpoint for future incrementals.
prof = client.get_profile()
hid = prof.get("historyId")
if hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
conn.commit()
finally:
conn.close()
return
# ---------------------------------------------------------------------------- incremental
def _run_incremental(conn_factory, client, account, index: InvestorIndex,
run_stats: dict) -> None:
start_hid = account["last_history_id"]
if not start_hid:
# Safety: if checkpoint is missing, re-enter backfill.
_run_backfill(conn_factory, client, account, index, run_stats)
return
# history_types filter limits bandwidth to what we care about.
new_hid: Optional[str] = None
try:
for h in client.iter_history(
start_history_id=start_hid,
history_types=["messageAdded", "messageDeleted", "labelAdded", "labelRemoved"],
):
for ma in h.get("messagesAdded") or []:
msg = ma.get("message") or {}
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=msg.get("id"),
run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip msg %s on %s: %s", msg.get("id"), account["email_address"], e)
for md in h.get("messagesDeleted") or []:
msg = md.get("message") or {}
conn = conn_factory()
try:
_db.tombstone_sighting(
conn,
account_id=account["id"],
gmail_message_id=msg.get("id"),
)
conn.commit()
finally:
conn.close()
for la in (h.get("labelsAdded") or []) + (h.get("labelsRemoved") or []):
msg = la.get("message") or {}
# labels are the resulting label set in Gmail's payload after
# the change. We refresh them wholesale.
labels = msg.get("labelIds") or []
conn = conn_factory()
try:
_db.update_sighting_labels(
conn,
account_id=account["id"],
gmail_message_id=msg.get("id"),
labels=labels,
)
conn.commit()
finally:
conn.close()
new_hid = client.last_history_id
except _errors.HistoryExpiredError:
raise
if new_hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(new_hid))
conn.commit()
finally:
conn.close()
def _fallback_date_backfill(conn_factory, client, account, index, run_stats):
"""Used when startHistoryId has been pruned by Gmail.
Pulls everything since last_synced_at (or 14d if unknown), which will
hit a large overlap with existing data but upserts are idempotent.
"""
from datetime import datetime, timedelta, timezone
since = account["last_synced_at"] or (
datetime.now(tz=timezone.utc) - timedelta(days=14)
).strftime("%Y-%m-%dT%H:%M:%SZ")
q = f"after:{since.replace('-', '/').split('T')[0]}"
for m in client.iter_messages(q=q):
run_stats["messages_seen"] += 1
try:
_process_one_message(conn_factory, client, account, index,
gmail_message_id=m["id"], run_stats=run_stats)
except _errors.GmailError as e:
log.warning("skip during date-backfill msg %s: %s", m["id"], e)
prof = client.get_profile()
hid = prof.get("historyId")
if hid:
conn = conn_factory()
try:
_db.set_account_checkpoint(conn, account["id"], history_id=str(hid))
conn.commit()
finally:
conn.close()
# ---------------------------------------------------------------------------- per-message
def _process_one_message(conn_factory, client, account, index: InvestorIndex,
*, gmail_message_id: str, run_stats: dict) -> None:
"""Fetch, match, persist one message. Idempotent."""
if not gmail_message_id:
return
# Skip if we've already sighted this message for this account.
conn = conn_factory()
try:
cur = conn.cursor()
cur.execute(
"SELECT email_id FROM email_account_messages "
"WHERE account_id = ? AND gmail_message_id = ?",
(account["id"], gmail_message_id),
)
if cur.fetchone():
return
finally:
conn.close()
# 1. Metadata fetch (cheap).
meta = client.get_message(gmail_message_id, format="metadata",
metadata_headers=METADATA_HEADERS)
meta_parsed = _parser.parse(meta, owning_account_address=account["email_address"])
participants = set()
if meta_parsed.get("from_email"):
participants.add(meta_parsed["from_email"])
for kind in ("to", "cc", "bcc"):
for a in meta_parsed.get(kind, []):
if isinstance(a, dict) and a.get("email"):
participants.add(a["email"])
# Exclude owning account's own address so we don't try to "match" ourselves.
own = {account["email_address"].lower()}
links = index.match(participants, exclude_addresses=own)
is_matched = bool(links)
# 2. If matched, fetch full and parse for body + attachments.
if is_matched:
full = client.get_message(gmail_message_id, format="full")
parsed = _parser.parse(full, owning_account_address=account["email_address"])
else:
parsed = meta_parsed
# Strip any body fields (metadata fetch shouldn't have them but be safe).
parsed["body_text"] = None
parsed["body_html"] = None
parsed["attachments"] = []
# 3. Persist (idempotent on rfc_message_id).
conn = conn_factory()
try:
existing = _db.find_email_by_rfc_id(conn, parsed["rfc_message_id"])
if existing:
email_id = existing["id"]
# If the email was previously unmatched but now matches (e.g. user
# added the investor after first sight), upgrade the row.
if is_matched and existing["match_status"] == "unmatched":
conn.execute(
"UPDATE emails SET match_status = 'matched', is_matched = 1, "
"body_text = ?, body_html = ?, updated_at = datetime('now') "
"WHERE id = ?",
(parsed.get("body_text"), parsed.get("body_html"), email_id),
)
_attach.register_stubs(conn,
email_id=email_id,
parsed_attachments=parsed.get("attachments") or [])
for link in links:
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
else:
match_status = "matched" if is_matched else "unmatched"
email_id = _db.insert_email(conn, parsed=parsed, match_status=match_status)
thread_id = _threads.resolve_thread_id(conn, parsed)
_db.set_email_thread(conn, email_id, thread_id)
if is_matched:
_attach.register_stubs(conn,
email_id=email_id,
parsed_attachments=parsed.get("attachments") or [])
for link in links:
_db.insert_investor_link(conn, email_id=email_id, link=_flatten_link(link))
_db.rollup_thread(conn, thread_id)
run_stats["messages_stored"] += 1
# Record sighting (always, even if email row was pre-existing).
_db.upsert_sighting(
conn,
email_id=email_id,
account_id=account["id"],
gmail_message_id=gmail_message_id,
gmail_thread_id=parsed.get("gmail_thread_id") or "",
labels=parsed.get("labels", []),
is_sent=parsed.get("is_sent", False),
)
conn.commit()
except sqlite3.IntegrityError:
# Concurrent insert race — re-read and proceed.
pass
finally:
conn.close()
def _flatten_link(link: InvestorLink) -> dict:
return {
"matched_address": link.matched_address,
"match_kind": link.match_kind,
"match_confidence": link.match_confidence,
"fundraising_investor_id": link.target.fundraising_investor_id,
"fundraising_contact_id": link.target.fundraising_contact_id,
"contact_id": link.target.contact_id,
"organization_id": link.target.organization_id,
}