""" Data-access layer for the email_integration module. All SQL touching emails_* tables lives here. Other modules call named helpers — they never write SQL inline. This keeps schema changes contained. Connection pattern matches server.py get_db(): - WAL mode, foreign keys on, busy_timeout - sqlite3.Row row_factory The caller is responsible for committing / closing. """ import json import os import sqlite3 import uuid from datetime import datetime, timezone from typing import Iterable, Optional # ------------------------------------------------------------------ migrations def apply_migrations(cursor: sqlite3.Cursor) -> None: """Apply all .sql migration files in migrations/ in lexicographic order. Called from server.init_db(). Idempotent. Does not log past migrations in a table yet — each file is guarded by CREATE ... IF NOT EXISTS etc. If we ever need more complex migrations, add a schema_migrations table. """ here = os.path.dirname(os.path.abspath(__file__)) mdir = os.path.join(here, "migrations") if not os.path.isdir(mdir): return for name in sorted(os.listdir(mdir)): if not name.endswith(".sql"): continue path = os.path.join(mdir, name) with open(path, "r") as f: sql = f.read() cursor.executescript(sql) # ------------------------------------------------------------------ utils def _uuid() -> str: return str(uuid.uuid4()) def _now_iso() -> str: return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _json(v) -> str: return json.dumps(v, separators=(",", ":")) # ------------------------------------------------------------------ email_accounts def list_sync_ready_accounts(conn: sqlite3.Connection) -> list[sqlite3.Row]: cur = conn.cursor() cur.execute( "SELECT * FROM email_accounts " "WHERE sync_enabled = 1 AND sync_status IN ('pending','active') " "ORDER BY last_synced_at IS NOT NULL, last_synced_at" ) return cur.fetchall() def get_account_by_email(conn: sqlite3.Connection, email_address: str) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute("SELECT * FROM email_accounts WHERE email_address = ?", (email_address,)) return cur.fetchone() def upsert_account(conn: sqlite3.Connection, *, user_id: str, email_address: str, auth_method: str) -> str: existing = get_account_by_email(conn, email_address) if existing: return existing["id"] account_id = _uuid() conn.execute( "INSERT INTO email_accounts (id, user_id, email_address, auth_method) " "VALUES (?, ?, ?, ?)", (account_id, user_id, email_address, auth_method), ) return account_id def set_account_status(conn: sqlite3.Connection, account_id: str, *, status: str, error: Optional[str] = None) -> None: conn.execute( "UPDATE email_accounts SET sync_status = ?, sync_error = ?, " "updated_at = datetime('now') WHERE id = ?", (status, error, account_id), ) def set_account_checkpoint(conn: sqlite3.Connection, account_id: str, *, history_id: Optional[str] = None, backfill_cursor: Optional[str] = None, backfill_complete: Optional[bool] = None, last_synced_at: Optional[str] = None) -> None: sets, params = [], [] if history_id is not None: sets.append("last_history_id = ?"); params.append(history_id) if backfill_cursor is not None: sets.append("backfill_cursor = ?"); params.append(backfill_cursor) if backfill_complete is not None: sets.append("backfill_complete = ?"); params.append(1 if backfill_complete else 0) if last_synced_at is not None: sets.append("last_synced_at = ?"); params.append(last_synced_at) if not sets: return sets.append("updated_at = datetime('now')") params.append(account_id) conn.execute(f"UPDATE email_accounts SET {', '.join(sets)} WHERE id = ?", params) # ------------------------------------------------------------------ emails def find_email_by_rfc_id(conn: sqlite3.Connection, rfc_message_id: str) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute("SELECT * FROM emails WHERE rfc_message_id = ?", (rfc_message_id,)) return cur.fetchone() def find_email_id_by_any_rfc_id(conn: sqlite3.Connection, rfc_ids: Iterable[str]) -> Optional[str]: ids = [r for r in rfc_ids if r] if not ids: return None placeholders = ",".join("?" for _ in ids) cur = conn.cursor() cur.execute( f"SELECT id FROM emails WHERE rfc_message_id IN ({placeholders}) " "ORDER BY sent_at ASC LIMIT 1", ids, ) row = cur.fetchone() return row["id"] if row else None def insert_email(conn: sqlite3.Connection, *, parsed: dict, match_status: str) -> str: """Insert a fresh emails row. Returns email_id. Caller must ensure no row exists for parsed['rfc_message_id']; use find_email_by_rfc_id first. """ email_id = _uuid() conn.execute( """INSERT INTO emails (id, rfc_message_id, gmail_thread_id, rfc_thread_root_id, subject, from_email, from_name, to_emails_json, cc_emails_json, bcc_emails_json, reply_to, sent_at, body_text, body_html, snippet, in_reply_to, references_json, has_attachments, size_estimate, is_matched, match_status, raw_headers_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( email_id, parsed["rfc_message_id"], parsed.get("gmail_thread_id"), parsed.get("rfc_thread_root_id"), parsed.get("subject"), parsed["from_email"], parsed.get("from_name"), _json(parsed.get("to", [])), _json(parsed.get("cc", [])), _json(parsed.get("bcc", [])), parsed.get("reply_to"), parsed["sent_at"], parsed.get("body_text"), parsed.get("body_html"), parsed.get("snippet"), parsed.get("in_reply_to"), _json(parsed.get("references", [])), 1 if parsed.get("attachments") else 0, parsed.get("size_estimate"), 1 if match_status == "matched" else 0, match_status, _json(parsed.get("raw_headers", {})) if parsed.get("raw_headers") else None, ), ) # recipients for kind in ("from", "to", "cc", "bcc", "reply_to"): addrs = [] if kind == "from" and parsed.get("from_email"): addrs = [(parsed["from_email"], parsed.get("from_name"))] elif kind == "reply_to" and parsed.get("reply_to"): addrs = [(parsed["reply_to"], None)] else: # `or []` (not get(kind, [])): the key is often present with value None # (e.g. reply_to when there is no Reply-To header), and `for a in None` # would raise TypeError and abort the whole backfill. for a in (parsed.get(kind) or []): if isinstance(a, dict): addrs.append((a.get("email"), a.get("name"))) else: addrs.append((a, None)) for address, name in addrs: if not address: continue conn.execute( "INSERT INTO email_recipients (id, email_id, address, display_name, kind) " "VALUES (?, ?, ?, ?, ?)", (_uuid(), email_id, address.lower().strip(), name, kind), ) return email_id def set_email_thread(conn: sqlite3.Connection, email_id: str, thread_id: str) -> None: conn.execute( "UPDATE emails SET thread_id = ?, updated_at = datetime('now') WHERE id = ?", (thread_id, email_id), ) # ------------------------------------------------------------------ sightings def upsert_sighting(conn: sqlite3.Connection, *, email_id: str, account_id: str, gmail_message_id: str, gmail_thread_id: str, labels: list[str], is_sent: bool) -> None: conn.execute( """INSERT OR IGNORE INTO email_account_messages (id, email_id, account_id, gmail_message_id, gmail_thread_id, labels_json, is_sent) VALUES (?, ?, ?, ?, ?, ?, ?)""", (_uuid(), email_id, account_id, gmail_message_id, gmail_thread_id, _json(labels), 1 if is_sent else 0), ) def update_sighting_labels(conn: sqlite3.Connection, *, account_id: str, gmail_message_id: str, labels: list[str]) -> None: conn.execute( "UPDATE email_account_messages SET labels_json = ? " "WHERE account_id = ? AND gmail_message_id = ?", (_json(labels), account_id, gmail_message_id), ) def tombstone_sighting(conn: sqlite3.Connection, *, account_id: str, gmail_message_id: str) -> None: conn.execute( "UPDATE email_account_messages SET deleted_at = datetime('now') " "WHERE account_id = ? AND gmail_message_id = ?", (account_id, gmail_message_id), ) # ------------------------------------------------------------------ attachments def insert_attachment_stub(conn: sqlite3.Connection, *, email_id: str, gmail_attachment_id: str, filename: str, sanitized_filename: str, mime_type: Optional[str], size_bytes: Optional[int], storage_path: str) -> str: att_id = _uuid() conn.execute( """INSERT INTO email_attachments (id, email_id, gmail_attachment_id, filename, sanitized_filename, mime_type, size_bytes, storage_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (att_id, email_id, gmail_attachment_id, filename, sanitized_filename, mime_type, size_bytes, storage_path), ) return att_id def mark_attachment_downloaded(conn: sqlite3.Connection, attachment_id: str, *, sha256_hex: str, size_bytes: int) -> None: conn.execute( "UPDATE email_attachments SET download_status = 'downloaded', " "sha256_hex = ?, size_bytes = ?, downloaded_at = datetime('now') " "WHERE id = ?", (sha256_hex, size_bytes, attachment_id), ) def mark_attachment_failed(conn: sqlite3.Connection, attachment_id: str, *, error: str) -> None: conn.execute( "UPDATE email_attachments SET download_status = 'failed', " "download_attempts = download_attempts + 1, download_error = ? " "WHERE id = ?", (error, attachment_id), ) def pending_attachments(conn: sqlite3.Connection, limit: int = 50) -> list[sqlite3.Row]: cur = conn.cursor() cur.execute( "SELECT a.*, eam.gmail_message_id, eam.account_id " "FROM email_attachments a " "JOIN email_account_messages eam ON eam.email_id = a.email_id " "WHERE a.download_status = 'pending' AND a.download_attempts < 5 " "LIMIT ?", (limit,), ) return cur.fetchall() # ------------------------------------------------------------------ threads def find_thread_by_gmail_id(conn: sqlite3.Connection, gmail_thread_id: str) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute( "SELECT * FROM email_threads WHERE gmail_thread_id = ?", (gmail_thread_id,), ) return cur.fetchone() def find_thread_by_rfc_root(conn: sqlite3.Connection, rfc_root: str) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute( "SELECT * FROM email_threads WHERE rfc_thread_root_id = ?", (rfc_root,), ) return cur.fetchone() def create_thread(conn: sqlite3.Connection, *, gmail_thread_id: Optional[str], rfc_thread_root_id: Optional[str], subject_normalized: Optional[str], first_message_at: Optional[str]) -> str: thread_id = _uuid() conn.execute( """INSERT INTO email_threads (id, gmail_thread_id, rfc_thread_root_id, subject_normalized, first_message_at, last_message_at, message_count) VALUES (?, ?, ?, ?, ?, ?, 0)""", (thread_id, gmail_thread_id, rfc_thread_root_id, subject_normalized, first_message_at, first_message_at), ) return thread_id def rollup_thread(conn: sqlite3.Connection, thread_id: str) -> None: """Recompute count / last_message_at / participants from member emails. Cheap at 5-person team volumes. For larger deployments swap to triggers. """ cur = conn.cursor() cur.execute( "SELECT COUNT(*) AS n, MIN(sent_at) AS first, MAX(sent_at) AS last, " "MAX(is_matched) AS matched FROM emails WHERE thread_id = ?", (thread_id,), ) row = cur.fetchone() if not row or row["n"] == 0: return cur.execute( "SELECT DISTINCT address FROM email_recipients er " "JOIN emails e ON e.id = er.email_id WHERE e.thread_id = ?", (thread_id,), ) participants = [r["address"] for r in cur.fetchall()] conn.execute( "UPDATE email_threads SET message_count = ?, first_message_at = ?, " "last_message_at = ?, participant_count = ?, participants_json = ?, " "is_matched = ?, updated_at = datetime('now') WHERE id = ?", (row["n"], row["first"], row["last"], len(participants), _json(participants), int(row["matched"] or 0), thread_id), ) # ------------------------------------------------------------------ investor links def insert_investor_link(conn: sqlite3.Connection, *, email_id: str, link: dict) -> None: conn.execute( """INSERT INTO email_investor_links (id, email_id, fundraising_investor_id, fundraising_contact_id, contact_id, organization_id, matched_address, match_kind, match_confidence) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( _uuid(), email_id, link.get("fundraising_investor_id"), link.get("fundraising_contact_id"), link.get("contact_id"), link.get("organization_id"), link["matched_address"], link["match_kind"], float(link.get("match_confidence", 1.0)), ), ) # ------------------------------------------------------------------ sync runs def start_sync_run(conn: sqlite3.Connection, *, account_id: str, kind: str) -> str: run_id = _uuid() conn.execute( "INSERT INTO email_sync_runs (id, account_id, kind, started_at, status) " "VALUES (?, ?, ?, ?, 'running')", (run_id, account_id, kind, _now_iso()), ) return run_id def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str] = None, account_id: Optional[str] = None, search: Optional[str] = None, direction: Optional[str] = None, limit: int = 100) -> dict: """Captured-Gmail activity for the admin Communications panel, filterable by investor (matched fundraising investor) and/or mailbox, with free-text search over subject/snippet/sender. Returns the email rows plus the filter facets. Matched-only: the panel shows ONLY email that links to a known investor/contact (an `email_investor_links` row exists). Unmatched cold/unknown-sender email is still captured for completeness but is never surfaced here. Soft-delete: an email is live only if it still has a non-tombstoned per-mailbox sighting (`email_account_messages.deleted_at IS NULL`) — the `emails` row itself carries no deleted_at, so deletion lives on the sighting. Direction is decided at the email level (outbound if the sender is one of our mailboxes), mirroring the digest builder, so a thread reads consistently regardless of which mailbox saw it. """ limit = max(1, min(int(limit or 100), 500)) cur = conn.cursor() own = {(r["email_address"] or "").lower().strip() for r in cur.execute("SELECT email_address FROM email_accounts")} own.discard("") where = ["EXISTS (SELECT 1 FROM email_account_messages eam " "WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", # Matched-only: surface email that links to a known investor/contact. # Unmatched (unknown-sender) email is captured but never shown here. "EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"] params: list = [] if account_id: where.append("EXISTS (SELECT 1 FROM email_account_messages eam " "WHERE eam.email_id = e.id AND eam.account_id = ? " "AND eam.deleted_at IS NULL)") params.append(account_id) if investor_id: where.append("EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id " "AND (l.fundraising_investor_id = ? OR l.fundraising_contact_id IN " "(SELECT id FROM fundraising_contacts WHERE investor_id = ?)))") params.extend([investor_id, investor_id]) if search: like = f"%{search.strip()}%" where.append("(e.subject LIKE ? OR e.snippet LIKE ? " "OR e.from_email LIKE ? OR e.from_name LIKE ?)") params.extend([like, like, like, like]) direction = (direction or "").strip().lower() if direction in ("inbound", "outbound") and own: marks = ",".join("?" for _ in own) op = "IN" if direction == "outbound" else "NOT IN" where.append(f"LOWER(e.from_email) {op} ({marks})") params.extend(sorted(own)) sql = ("SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, " "e.has_attachments, e.is_matched, e.match_status FROM emails e WHERE " + " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?") rows = [dict(r) for r in cur.execute(sql, params + [limit + 1])] truncated = len(rows) > limit rows = rows[:limit] by_id = {r["id"]: r for r in rows} for r in rows: r["direction"] = "outbound" if (r["from_email"] or "").lower().strip() in own else "inbound" r["mailboxes"] = [] r["investors"] = [] r["investor_labels"] = [] ids = list(by_id) if ids: marks = ",".join("?" for _ in ids) for s in cur.execute( "SELECT eam.email_id AS eid, ea.email_address AS addr " "FROM email_account_messages eam JOIN email_accounts ea ON ea.id = eam.account_id " f"WHERE eam.deleted_at IS NULL AND eam.email_id IN ({marks}) " "ORDER BY ea.email_address", ids): mb = by_id[s["eid"]]["mailboxes"] if s["addr"] and s["addr"] not in mb: mb.append(s["addr"]) for lnk in cur.execute( "SELECT l.email_id AS eid, l.matched_address AS addr, " "COALESCE(fi.id, fic_inv.id) AS inv_id, " "COALESCE(fi.investor_name, fic_inv.investor_name) AS inv_name " "FROM email_investor_links l " "LEFT JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id " "LEFT JOIN fundraising_contacts fic ON fic.id = l.fundraising_contact_id " "LEFT JOIN fundraising_investors fic_inv ON fic_inv.id = fic.investor_id " f"WHERE l.email_id IN ({marks})", ids): row = by_id[lnk["eid"]] if lnk["inv_id"] and lnk["inv_name"]: if not any(iv["id"] == lnk["inv_id"] for iv in row["investors"]): row["investors"].append({"id": lnk["inv_id"], "name": lnk["inv_name"]}) elif lnk["addr"] and lnk["addr"] not in row["investor_labels"]: row["investor_labels"].append(lnk["addr"]) accounts = [dict(r) for r in cur.execute( "SELECT id, email_address FROM email_accounts ORDER BY email_address")] # Facet dropdown = live investor relationships only (graveyard = 0, the CRM-wide # convention). Graveyarded investors are excluded from the *picker*, but their # captured email still shows in the unfiltered list and stays findable by free-text # search — this is an audit surface, so history is never hidden, only the picker is. investors = [dict(r) for r in cur.execute( "SELECT id, investor_name AS name FROM fundraising_investors WHERE graveyard = 0 AND id IN (" " SELECT fundraising_investor_id FROM email_investor_links " " WHERE fundraising_investor_id IS NOT NULL" " UNION" " SELECT investor_id FROM fundraising_contacts WHERE id IN (" " SELECT fundraising_contact_id FROM email_investor_links " " WHERE fundraising_contact_id IS NOT NULL)" ") ORDER BY investor_name")] return {"emails": rows, "accounts": accounts, "investors": investors, "count": len(rows), "truncated": truncated} def finish_sync_run(conn: sqlite3.Connection, run_id: str, *, status: str, stats: Optional[dict] = None, error: Optional[str] = None) -> None: stats = stats or {} conn.execute( """UPDATE email_sync_runs SET finished_at = ?, status = ?, messages_seen = ?, messages_stored = ?, attachments_saved = ?, api_calls = ?, retries = ?, error = ? WHERE id = ?""", ( _now_iso(), status, int(stats.get("messages_seen", 0)), int(stats.get("messages_stored", 0)), int(stats.get("attachments_saved", 0)), int(stats.get("api_calls", 0)), int(stats.get("retries", 0)), error, run_id, ), )