Files
ten31-database/backend/email_integration/db.py
T
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

417 lines
15 KiB
Python

"""
Data-access layer for the email_integration module.
All SQL touching emails_* tables lives here. Other modules call named
helpers — they never write SQL inline. This keeps schema changes contained.
Connection pattern matches server.py get_db():
- WAL mode, foreign keys on, busy_timeout
- sqlite3.Row row_factory
The caller is responsible for committing / closing.
"""
import json
import os
import sqlite3
import uuid
from datetime import datetime, timezone
from typing import Iterable, Optional
# ------------------------------------------------------------------ migrations
def apply_migrations(cursor: sqlite3.Cursor) -> None:
"""Apply all .sql migration files in migrations/ in lexicographic order.
Called from server.init_db(). Idempotent. Does not log past migrations in
a table yet — each file is guarded by CREATE ... IF NOT EXISTS etc. If
we ever need more complex migrations, add a schema_migrations table.
"""
here = os.path.dirname(os.path.abspath(__file__))
mdir = os.path.join(here, "migrations")
if not os.path.isdir(mdir):
return
for name in sorted(os.listdir(mdir)):
if not name.endswith(".sql"):
continue
path = os.path.join(mdir, name)
with open(path, "r") as f:
sql = f.read()
cursor.executescript(sql)
# ------------------------------------------------------------------ utils
def _uuid() -> str:
return str(uuid.uuid4())
def _now_iso() -> str:
return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _json(v) -> str:
return json.dumps(v, separators=(",", ":"))
# ------------------------------------------------------------------ email_accounts
def list_sync_ready_accounts(conn: sqlite3.Connection) -> list[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_accounts "
"WHERE sync_enabled = 1 AND sync_status IN ('pending','active') "
"ORDER BY last_synced_at IS NOT NULL, last_synced_at"
)
return cur.fetchall()
def get_account_by_email(conn: sqlite3.Connection, email_address: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute("SELECT * FROM email_accounts WHERE email_address = ?", (email_address,))
return cur.fetchone()
def upsert_account(conn: sqlite3.Connection, *, user_id: str, email_address: str,
auth_method: str) -> str:
existing = get_account_by_email(conn, email_address)
if existing:
return existing["id"]
account_id = _uuid()
conn.execute(
"INSERT INTO email_accounts (id, user_id, email_address, auth_method) "
"VALUES (?, ?, ?, ?)",
(account_id, user_id, email_address, auth_method),
)
return account_id
def set_account_status(conn: sqlite3.Connection, account_id: str, *,
status: str, error: Optional[str] = None) -> None:
conn.execute(
"UPDATE email_accounts SET sync_status = ?, sync_error = ?, "
"updated_at = datetime('now') WHERE id = ?",
(status, error, account_id),
)
def set_account_checkpoint(conn: sqlite3.Connection, account_id: str, *,
history_id: Optional[str] = None,
backfill_cursor: Optional[str] = None,
backfill_complete: Optional[bool] = None,
last_synced_at: Optional[str] = None) -> None:
sets, params = [], []
if history_id is not None:
sets.append("last_history_id = ?"); params.append(history_id)
if backfill_cursor is not None:
sets.append("backfill_cursor = ?"); params.append(backfill_cursor)
if backfill_complete is not None:
sets.append("backfill_complete = ?"); params.append(1 if backfill_complete else 0)
if last_synced_at is not None:
sets.append("last_synced_at = ?"); params.append(last_synced_at)
if not sets:
return
sets.append("updated_at = datetime('now')")
params.append(account_id)
conn.execute(f"UPDATE email_accounts SET {', '.join(sets)} WHERE id = ?", params)
# ------------------------------------------------------------------ emails
def find_email_by_rfc_id(conn: sqlite3.Connection, rfc_message_id: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute("SELECT * FROM emails WHERE rfc_message_id = ?", (rfc_message_id,))
return cur.fetchone()
def find_email_id_by_any_rfc_id(conn: sqlite3.Connection,
rfc_ids: Iterable[str]) -> Optional[str]:
ids = [r for r in rfc_ids if r]
if not ids:
return None
placeholders = ",".join("?" for _ in ids)
cur = conn.cursor()
cur.execute(
f"SELECT id FROM emails WHERE rfc_message_id IN ({placeholders}) "
"ORDER BY sent_at ASC LIMIT 1",
ids,
)
row = cur.fetchone()
return row["id"] if row else None
def insert_email(conn: sqlite3.Connection, *, parsed: dict, match_status: str) -> str:
"""Insert a fresh emails row. Returns email_id.
Caller must ensure no row exists for parsed['rfc_message_id']; use
find_email_by_rfc_id first.
"""
email_id = _uuid()
conn.execute(
"""INSERT INTO emails
(id, rfc_message_id, gmail_thread_id, rfc_thread_root_id, subject,
from_email, from_name, to_emails_json, cc_emails_json, bcc_emails_json,
reply_to, sent_at, body_text, body_html, snippet, in_reply_to,
references_json, has_attachments, size_estimate, is_matched,
match_status, raw_headers_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
email_id,
parsed["rfc_message_id"],
parsed.get("gmail_thread_id"),
parsed.get("rfc_thread_root_id"),
parsed.get("subject"),
parsed["from_email"],
parsed.get("from_name"),
_json(parsed.get("to", [])),
_json(parsed.get("cc", [])),
_json(parsed.get("bcc", [])),
parsed.get("reply_to"),
parsed["sent_at"],
parsed.get("body_text"),
parsed.get("body_html"),
parsed.get("snippet"),
parsed.get("in_reply_to"),
_json(parsed.get("references", [])),
1 if parsed.get("attachments") else 0,
parsed.get("size_estimate"),
1 if match_status == "matched" else 0,
match_status,
_json(parsed.get("raw_headers", {})) if parsed.get("raw_headers") else None,
),
)
# recipients
for kind in ("from", "to", "cc", "bcc", "reply_to"):
addrs = []
if kind == "from" and parsed.get("from_email"):
addrs = [(parsed["from_email"], parsed.get("from_name"))]
elif kind == "reply_to" and parsed.get("reply_to"):
addrs = [(parsed["reply_to"], None)]
else:
for a in parsed.get(kind, []):
if isinstance(a, dict):
addrs.append((a.get("email"), a.get("name")))
else:
addrs.append((a, None))
for address, name in addrs:
if not address:
continue
conn.execute(
"INSERT INTO email_recipients (id, email_id, address, display_name, kind) "
"VALUES (?, ?, ?, ?, ?)",
(_uuid(), email_id, address.lower().strip(), name, kind),
)
return email_id
def set_email_thread(conn: sqlite3.Connection, email_id: str, thread_id: str) -> None:
conn.execute(
"UPDATE emails SET thread_id = ?, updated_at = datetime('now') WHERE id = ?",
(thread_id, email_id),
)
# ------------------------------------------------------------------ sightings
def upsert_sighting(conn: sqlite3.Connection, *, email_id: str, account_id: str,
gmail_message_id: str, gmail_thread_id: str,
labels: list[str], is_sent: bool) -> None:
conn.execute(
"""INSERT OR IGNORE INTO email_account_messages
(id, email_id, account_id, gmail_message_id, gmail_thread_id,
labels_json, is_sent)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(_uuid(), email_id, account_id, gmail_message_id, gmail_thread_id,
_json(labels), 1 if is_sent else 0),
)
def update_sighting_labels(conn: sqlite3.Connection, *, account_id: str,
gmail_message_id: str, labels: list[str]) -> None:
conn.execute(
"UPDATE email_account_messages SET labels_json = ? "
"WHERE account_id = ? AND gmail_message_id = ?",
(_json(labels), account_id, gmail_message_id),
)
def tombstone_sighting(conn: sqlite3.Connection, *, account_id: str,
gmail_message_id: str) -> None:
conn.execute(
"UPDATE email_account_messages SET deleted_at = datetime('now') "
"WHERE account_id = ? AND gmail_message_id = ?",
(account_id, gmail_message_id),
)
# ------------------------------------------------------------------ attachments
def insert_attachment_stub(conn: sqlite3.Connection, *, email_id: str,
gmail_attachment_id: str, filename: str,
sanitized_filename: str, mime_type: Optional[str],
size_bytes: Optional[int], storage_path: str) -> str:
att_id = _uuid()
conn.execute(
"""INSERT INTO email_attachments
(id, email_id, gmail_attachment_id, filename, sanitized_filename,
mime_type, size_bytes, storage_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(att_id, email_id, gmail_attachment_id, filename, sanitized_filename,
mime_type, size_bytes, storage_path),
)
return att_id
def mark_attachment_downloaded(conn: sqlite3.Connection, attachment_id: str, *,
sha256_hex: str, size_bytes: int) -> None:
conn.execute(
"UPDATE email_attachments SET download_status = 'downloaded', "
"sha256_hex = ?, size_bytes = ?, downloaded_at = datetime('now') "
"WHERE id = ?",
(sha256_hex, size_bytes, attachment_id),
)
def mark_attachment_failed(conn: sqlite3.Connection, attachment_id: str, *,
error: str) -> None:
conn.execute(
"UPDATE email_attachments SET download_status = 'failed', "
"download_attempts = download_attempts + 1, download_error = ? "
"WHERE id = ?",
(error, attachment_id),
)
def pending_attachments(conn: sqlite3.Connection, limit: int = 50) -> list[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT a.*, eam.gmail_message_id, eam.account_id "
"FROM email_attachments a "
"JOIN email_account_messages eam ON eam.email_id = a.email_id "
"WHERE a.download_status = 'pending' AND a.download_attempts < 5 "
"LIMIT ?",
(limit,),
)
return cur.fetchall()
# ------------------------------------------------------------------ threads
def find_thread_by_gmail_id(conn: sqlite3.Connection, gmail_thread_id: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_threads WHERE gmail_thread_id = ?",
(gmail_thread_id,),
)
return cur.fetchone()
def find_thread_by_rfc_root(conn: sqlite3.Connection, rfc_root: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_threads WHERE rfc_thread_root_id = ?",
(rfc_root,),
)
return cur.fetchone()
def create_thread(conn: sqlite3.Connection, *, gmail_thread_id: Optional[str],
rfc_thread_root_id: Optional[str], subject_normalized: Optional[str],
first_message_at: Optional[str]) -> str:
thread_id = _uuid()
conn.execute(
"""INSERT INTO email_threads
(id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
first_message_at, last_message_at, message_count)
VALUES (?, ?, ?, ?, ?, ?, 0)""",
(thread_id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
first_message_at, first_message_at),
)
return thread_id
def rollup_thread(conn: sqlite3.Connection, thread_id: str) -> None:
"""Recompute count / last_message_at / participants from member emails.
Cheap at 5-person team volumes. For larger deployments swap to triggers.
"""
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) AS n, MIN(sent_at) AS first, MAX(sent_at) AS last, "
"MAX(is_matched) AS matched FROM emails WHERE thread_id = ?",
(thread_id,),
)
row = cur.fetchone()
if not row or row["n"] == 0:
return
cur.execute(
"SELECT DISTINCT address FROM email_recipients er "
"JOIN emails e ON e.id = er.email_id WHERE e.thread_id = ?",
(thread_id,),
)
participants = [r["address"] for r in cur.fetchall()]
conn.execute(
"UPDATE email_threads SET message_count = ?, first_message_at = ?, "
"last_message_at = ?, participant_count = ?, participants_json = ?, "
"is_matched = ?, updated_at = datetime('now') WHERE id = ?",
(row["n"], row["first"], row["last"], len(participants),
_json(participants), int(row["matched"] or 0), thread_id),
)
# ------------------------------------------------------------------ investor links
def insert_investor_link(conn: sqlite3.Connection, *, email_id: str,
link: dict) -> None:
conn.execute(
"""INSERT INTO email_investor_links
(id, email_id, fundraising_investor_id, fundraising_contact_id,
contact_id, organization_id, matched_address, match_kind,
match_confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
_uuid(),
email_id,
link.get("fundraising_investor_id"),
link.get("fundraising_contact_id"),
link.get("contact_id"),
link.get("organization_id"),
link["matched_address"],
link["match_kind"],
float(link.get("match_confidence", 1.0)),
),
)
# ------------------------------------------------------------------ sync runs
def start_sync_run(conn: sqlite3.Connection, *, account_id: str, kind: str) -> str:
run_id = _uuid()
conn.execute(
"INSERT INTO email_sync_runs (id, account_id, kind, started_at, status) "
"VALUES (?, ?, ?, ?, 'running')",
(run_id, account_id, kind, _now_iso()),
)
return run_id
def finish_sync_run(conn: sqlite3.Connection, run_id: str, *, status: str,
stats: Optional[dict] = None, error: Optional[str] = None) -> None:
stats = stats or {}
conn.execute(
"""UPDATE email_sync_runs
SET finished_at = ?, status = ?, messages_seen = ?, messages_stored = ?,
attachments_saved = ?, api_calls = ?, retries = ?, error = ?
WHERE id = ?""",
(
_now_iso(), status,
int(stats.get("messages_seen", 0)),
int(stats.get("messages_stored", 0)),
int(stats.get("attachments_saved", 0)),
int(stats.get("api_calls", 0)),
int(stats.get("retries", 0)),
error,
run_id,
),
)