Files
ten31-database/backend/email_integration/db.py
T
Keysat c7b74a2704 Email search/query + windowed digest preview (v0.1.0:83)
Communications tab (search/query roadmap items 1 & 2):
- Fix the investor dropdown: the facet only listed grid investors, so it
  came back empty whenever email matched a classic contact or org domain
  (no grid id — the common case). It now mirrors the email list, resolving
  each link to a typed identity (fund:/org:/contact:/addr:) with precedence
  grid -> org -> contact -> address; investor_id accepts the typed key
  (bare id = fund: for back-compat) and an unknown prefix matches nothing.
- Add a date-range filter and a click-to-expand full-body view
  (GET /api/email/detail, admin, soft-delete-gated; body_text only, never
  raw remote HTML).
- Add a "Search content" mode: GET /api/email/search wraps the ingest
  hybrid_search over the Qdrant email index (doc_type=email), hydrated and
  soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down.

Daily digest:
- Settings -> Admin builds a digest over a chosen window (last 24h or since
  a date) as an in-app preview before sending (POST /api/admin/digest/preview),
  so the local-Spark summarizer can be verified on demand even on a quiet day.
  Manual send uses the same window; neither advances the daily cursor, so a
  preview never suppresses the scheduled digest.

Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
2026-06-16 20:46:15 -05:00

689 lines
29 KiB
Python

"""
Data-access layer for the email_integration module.
All SQL touching emails_* tables lives here. Other modules call named
helpers — they never write SQL inline. This keeps schema changes contained.
Connection pattern matches server.py get_db():
- WAL mode, foreign keys on, busy_timeout
- sqlite3.Row row_factory
The caller is responsible for committing / closing.
"""
import json
import os
import sqlite3
import uuid
from datetime import datetime, timezone
from typing import Iterable, Optional
# ------------------------------------------------------------------ migrations
def apply_migrations(cursor: sqlite3.Cursor) -> None:
"""Apply all .sql migration files in migrations/ in lexicographic order.
Called from server.init_db(). Idempotent. Does not log past migrations in
a table yet — each file is guarded by CREATE ... IF NOT EXISTS etc. If
we ever need more complex migrations, add a schema_migrations table.
"""
here = os.path.dirname(os.path.abspath(__file__))
mdir = os.path.join(here, "migrations")
if not os.path.isdir(mdir):
return
for name in sorted(os.listdir(mdir)):
if not name.endswith(".sql"):
continue
path = os.path.join(mdir, name)
with open(path, "r") as f:
sql = f.read()
cursor.executescript(sql)
# ------------------------------------------------------------------ utils
def _uuid() -> str:
return str(uuid.uuid4())
def _now_iso() -> str:
return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _json(v) -> str:
return json.dumps(v, separators=(",", ":"))
# ------------------------------------------------------------------ email_accounts
def list_sync_ready_accounts(conn: sqlite3.Connection) -> list[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_accounts "
"WHERE sync_enabled = 1 AND sync_status IN ('pending','active') "
"ORDER BY last_synced_at IS NOT NULL, last_synced_at"
)
return cur.fetchall()
def get_account_by_email(conn: sqlite3.Connection, email_address: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute("SELECT * FROM email_accounts WHERE email_address = ?", (email_address,))
return cur.fetchone()
def upsert_account(conn: sqlite3.Connection, *, user_id: str, email_address: str,
auth_method: str) -> str:
existing = get_account_by_email(conn, email_address)
if existing:
return existing["id"]
account_id = _uuid()
conn.execute(
"INSERT INTO email_accounts (id, user_id, email_address, auth_method) "
"VALUES (?, ?, ?, ?)",
(account_id, user_id, email_address, auth_method),
)
return account_id
def set_account_status(conn: sqlite3.Connection, account_id: str, *,
status: str, error: Optional[str] = None) -> None:
conn.execute(
"UPDATE email_accounts SET sync_status = ?, sync_error = ?, "
"updated_at = datetime('now') WHERE id = ?",
(status, error, account_id),
)
def set_account_checkpoint(conn: sqlite3.Connection, account_id: str, *,
history_id: Optional[str] = None,
backfill_cursor: Optional[str] = None,
backfill_complete: Optional[bool] = None,
last_synced_at: Optional[str] = None) -> None:
sets, params = [], []
if history_id is not None:
sets.append("last_history_id = ?"); params.append(history_id)
if backfill_cursor is not None:
sets.append("backfill_cursor = ?"); params.append(backfill_cursor)
if backfill_complete is not None:
sets.append("backfill_complete = ?"); params.append(1 if backfill_complete else 0)
if last_synced_at is not None:
sets.append("last_synced_at = ?"); params.append(last_synced_at)
if not sets:
return
sets.append("updated_at = datetime('now')")
params.append(account_id)
conn.execute(f"UPDATE email_accounts SET {', '.join(sets)} WHERE id = ?", params)
# ------------------------------------------------------------------ emails
def find_email_by_rfc_id(conn: sqlite3.Connection, rfc_message_id: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute("SELECT * FROM emails WHERE rfc_message_id = ?", (rfc_message_id,))
return cur.fetchone()
def find_email_id_by_any_rfc_id(conn: sqlite3.Connection,
rfc_ids: Iterable[str]) -> Optional[str]:
ids = [r for r in rfc_ids if r]
if not ids:
return None
placeholders = ",".join("?" for _ in ids)
cur = conn.cursor()
cur.execute(
f"SELECT id FROM emails WHERE rfc_message_id IN ({placeholders}) "
"ORDER BY sent_at ASC LIMIT 1",
ids,
)
row = cur.fetchone()
return row["id"] if row else None
def insert_email(conn: sqlite3.Connection, *, parsed: dict, match_status: str) -> str:
"""Insert a fresh emails row. Returns email_id.
Caller must ensure no row exists for parsed['rfc_message_id']; use
find_email_by_rfc_id first.
"""
email_id = _uuid()
conn.execute(
"""INSERT INTO emails
(id, rfc_message_id, gmail_thread_id, rfc_thread_root_id, subject,
from_email, from_name, to_emails_json, cc_emails_json, bcc_emails_json,
reply_to, sent_at, body_text, body_html, snippet, in_reply_to,
references_json, has_attachments, size_estimate, is_matched,
match_status, raw_headers_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
email_id,
parsed["rfc_message_id"],
parsed.get("gmail_thread_id"),
parsed.get("rfc_thread_root_id"),
parsed.get("subject"),
parsed["from_email"],
parsed.get("from_name"),
_json(parsed.get("to", [])),
_json(parsed.get("cc", [])),
_json(parsed.get("bcc", [])),
parsed.get("reply_to"),
parsed["sent_at"],
parsed.get("body_text"),
parsed.get("body_html"),
parsed.get("snippet"),
parsed.get("in_reply_to"),
_json(parsed.get("references", [])),
1 if parsed.get("attachments") else 0,
parsed.get("size_estimate"),
1 if match_status == "matched" else 0,
match_status,
_json(parsed.get("raw_headers", {})) if parsed.get("raw_headers") else None,
),
)
# recipients
for kind in ("from", "to", "cc", "bcc", "reply_to"):
addrs = []
if kind == "from" and parsed.get("from_email"):
addrs = [(parsed["from_email"], parsed.get("from_name"))]
elif kind == "reply_to" and parsed.get("reply_to"):
addrs = [(parsed["reply_to"], None)]
else:
# `or []` (not get(kind, [])): the key is often present with value None
# (e.g. reply_to when there is no Reply-To header), and `for a in None`
# would raise TypeError and abort the whole backfill.
for a in (parsed.get(kind) or []):
if isinstance(a, dict):
addrs.append((a.get("email"), a.get("name")))
else:
addrs.append((a, None))
for address, name in addrs:
if not address:
continue
conn.execute(
"INSERT INTO email_recipients (id, email_id, address, display_name, kind) "
"VALUES (?, ?, ?, ?, ?)",
(_uuid(), email_id, address.lower().strip(), name, kind),
)
return email_id
def set_email_thread(conn: sqlite3.Connection, email_id: str, thread_id: str) -> None:
conn.execute(
"UPDATE emails SET thread_id = ?, updated_at = datetime('now') WHERE id = ?",
(thread_id, email_id),
)
# ------------------------------------------------------------------ sightings
def upsert_sighting(conn: sqlite3.Connection, *, email_id: str, account_id: str,
gmail_message_id: str, gmail_thread_id: str,
labels: list[str], is_sent: bool) -> None:
conn.execute(
"""INSERT OR IGNORE INTO email_account_messages
(id, email_id, account_id, gmail_message_id, gmail_thread_id,
labels_json, is_sent)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(_uuid(), email_id, account_id, gmail_message_id, gmail_thread_id,
_json(labels), 1 if is_sent else 0),
)
def update_sighting_labels(conn: sqlite3.Connection, *, account_id: str,
gmail_message_id: str, labels: list[str]) -> None:
conn.execute(
"UPDATE email_account_messages SET labels_json = ? "
"WHERE account_id = ? AND gmail_message_id = ?",
(_json(labels), account_id, gmail_message_id),
)
def tombstone_sighting(conn: sqlite3.Connection, *, account_id: str,
gmail_message_id: str) -> None:
conn.execute(
"UPDATE email_account_messages SET deleted_at = datetime('now') "
"WHERE account_id = ? AND gmail_message_id = ?",
(account_id, gmail_message_id),
)
# ------------------------------------------------------------------ attachments
def insert_attachment_stub(conn: sqlite3.Connection, *, email_id: str,
gmail_attachment_id: str, filename: str,
sanitized_filename: str, mime_type: Optional[str],
size_bytes: Optional[int], storage_path: str) -> str:
att_id = _uuid()
conn.execute(
"""INSERT INTO email_attachments
(id, email_id, gmail_attachment_id, filename, sanitized_filename,
mime_type, size_bytes, storage_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(att_id, email_id, gmail_attachment_id, filename, sanitized_filename,
mime_type, size_bytes, storage_path),
)
return att_id
def mark_attachment_downloaded(conn: sqlite3.Connection, attachment_id: str, *,
sha256_hex: str, size_bytes: int) -> None:
conn.execute(
"UPDATE email_attachments SET download_status = 'downloaded', "
"sha256_hex = ?, size_bytes = ?, downloaded_at = datetime('now') "
"WHERE id = ?",
(sha256_hex, size_bytes, attachment_id),
)
def mark_attachment_failed(conn: sqlite3.Connection, attachment_id: str, *,
error: str) -> None:
conn.execute(
"UPDATE email_attachments SET download_status = 'failed', "
"download_attempts = download_attempts + 1, download_error = ? "
"WHERE id = ?",
(error, attachment_id),
)
def pending_attachments(conn: sqlite3.Connection, limit: int = 50) -> list[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT a.*, eam.gmail_message_id, eam.account_id "
"FROM email_attachments a "
"JOIN email_account_messages eam ON eam.email_id = a.email_id "
"WHERE a.download_status = 'pending' AND a.download_attempts < 5 "
"LIMIT ?",
(limit,),
)
return cur.fetchall()
# ------------------------------------------------------------------ threads
def find_thread_by_gmail_id(conn: sqlite3.Connection, gmail_thread_id: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_threads WHERE gmail_thread_id = ?",
(gmail_thread_id,),
)
return cur.fetchone()
def find_thread_by_rfc_root(conn: sqlite3.Connection, rfc_root: str) -> Optional[sqlite3.Row]:
cur = conn.cursor()
cur.execute(
"SELECT * FROM email_threads WHERE rfc_thread_root_id = ?",
(rfc_root,),
)
return cur.fetchone()
def create_thread(conn: sqlite3.Connection, *, gmail_thread_id: Optional[str],
rfc_thread_root_id: Optional[str], subject_normalized: Optional[str],
first_message_at: Optional[str]) -> str:
thread_id = _uuid()
conn.execute(
"""INSERT INTO email_threads
(id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
first_message_at, last_message_at, message_count)
VALUES (?, ?, ?, ?, ?, ?, 0)""",
(thread_id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
first_message_at, first_message_at),
)
return thread_id
def rollup_thread(conn: sqlite3.Connection, thread_id: str) -> None:
"""Recompute count / last_message_at / participants from member emails.
Cheap at 5-person team volumes. For larger deployments swap to triggers.
"""
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) AS n, MIN(sent_at) AS first, MAX(sent_at) AS last, "
"MAX(is_matched) AS matched FROM emails WHERE thread_id = ?",
(thread_id,),
)
row = cur.fetchone()
if not row or row["n"] == 0:
return
cur.execute(
"SELECT DISTINCT address FROM email_recipients er "
"JOIN emails e ON e.id = er.email_id WHERE e.thread_id = ?",
(thread_id,),
)
participants = [r["address"] for r in cur.fetchall()]
conn.execute(
"UPDATE email_threads SET message_count = ?, first_message_at = ?, "
"last_message_at = ?, participant_count = ?, participants_json = ?, "
"is_matched = ?, updated_at = datetime('now') WHERE id = ?",
(row["n"], row["first"], row["last"], len(participants),
_json(participants), int(row["matched"] or 0), thread_id),
)
# ------------------------------------------------------------------ investor links
def insert_investor_link(conn: sqlite3.Connection, *, email_id: str,
link: dict) -> None:
conn.execute(
"""INSERT INTO email_investor_links
(id, email_id, fundraising_investor_id, fundraising_contact_id,
contact_id, organization_id, matched_address, match_kind,
match_confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
_uuid(),
email_id,
link.get("fundraising_investor_id"),
link.get("fundraising_contact_id"),
link.get("contact_id"),
link.get("organization_id"),
link["matched_address"],
link["match_kind"],
float(link.get("match_confidence", 1.0)),
),
)
# ------------------------------------------------------------------ sync runs
def start_sync_run(conn: sqlite3.Connection, *, account_id: str, kind: str) -> str:
run_id = _uuid()
conn.execute(
"INSERT INTO email_sync_runs (id, account_id, kind, started_at, status) "
"VALUES (?, ?, ?, ?, 'running')",
(run_id, account_id, kind, _now_iso()),
)
return run_id
def _resolve_entity(row) -> tuple:
"""Reduce one email_investor_links hydration row to a (key, name) identity for
the matched investor, with the same precedence the digest uses:
grid investor -> organization -> contact -> raw matched address. The key is
*typed* (`fund:`/`org:`/`contact:`/`addr:`) so the Communications filter can
target the right column. Soft-deleted org/contact rows arrive as NULL (filtered
in the join) and fall through to the next tier."""
if row["fund_id"] and (row["fund_name"] or "").strip():
return f"fund:{row['fund_id']}", row["fund_name"].strip()
if row["org_id"] and (row["org_name"] or "").strip():
return f"org:{row['org_id']}", row["org_name"].strip()
if row["contact_id"] and (row["contact_name"] or "").strip():
return f"contact:{row['contact_id']}", row["contact_name"].strip()
addr = (row["addr"] or "").strip()
return (f"addr:{addr.lower()}", addr) if addr else (None, None)
# Hydration of an email_investor_links row up to the resolvable investor identity,
# shared by the per-email tags and the facet dropdown. Soft-deleted org/contact
# rows are dropped in the join so they fall through to the next identity tier.
_LINK_IDENTITY_JOINS = """
LEFT JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id
LEFT JOIN fundraising_contacts fic ON fic.id = l.fundraising_contact_id
LEFT JOIN fundraising_investors fic_inv ON fic_inv.id = fic.investor_id
LEFT JOIN organizations o ON o.id = l.organization_id AND o.deleted_at IS NULL
LEFT JOIN contacts c ON c.id = l.contact_id AND c.deleted_at IS NULL
"""
_LINK_IDENTITY_COLS = """
l.matched_address AS addr,
COALESCE(fi.id, fic_inv.id) AS fund_id,
COALESCE(fi.investor_name, fic_inv.investor_name) AS fund_name,
COALESCE(fi.graveyard, fic_inv.graveyard) AS fund_graveyard,
o.id AS org_id, o.name AS org_name,
c.id AS contact_id,
NULLIF(TRIM(COALESCE(c.first_name,'') || ' ' || COALESCE(c.last_name,'')), '') AS contact_name
"""
def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str] = None,
account_id: Optional[str] = None, search: Optional[str] = None,
direction: Optional[str] = None, since: Optional[str] = None,
until: Optional[str] = None, limit: int = 100) -> dict:
"""Captured-Gmail activity for the admin Communications panel, filterable by
matched investor entity, mailbox, direction and date range, with free-text
search over subject/snippet/sender. Returns the email rows plus the filter facets.
Matched-only: the panel shows ONLY email that links to a known investor/contact
(an `email_investor_links` row exists). Unmatched cold/unknown-sender email is
still captured for completeness but is never surfaced here.
Investor identity is *typed* (`fund:`/`org:`/`contact:`/`addr:`) and resolved with
the digest's precedence (grid investor -> organization -> contact -> raw address),
so an email matched only to a classic contact or an org domain — not yet wired to a
grid investor — still shows a real name and is selectable in the dropdown, instead
of the facet coming back empty. `investor_id` accepts a typed key (a bare id is
treated as `fund:` for backward compatibility).
Soft-delete: an email is live only if it still has a non-tombstoned per-mailbox
sighting (`email_account_messages.deleted_at IS NULL`) — the `emails` row itself
carries no deleted_at, so deletion lives on the sighting. Direction is decided at
the email level (outbound if the sender is one of our mailboxes), mirroring the
digest builder, so a thread reads consistently regardless of which mailbox saw it.
"""
limit = max(1, min(int(limit or 100), 500))
cur = conn.cursor()
own = {(r["email_address"] or "").lower().strip()
for r in cur.execute("SELECT email_address FROM email_accounts")}
own.discard("")
where = ["EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)",
# Matched-only: surface email that links to a known investor/contact.
# Unmatched (unknown-sender) email is captured but never shown here.
"EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"]
params: list = []
if account_id:
where.append("EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.account_id = ? "
"AND eam.deleted_at IS NULL)")
params.append(account_id)
if investor_id:
kind, _, val = str(investor_id).partition(":")
if not val: # bare id (legacy) -> grid investor
kind, val = "fund", str(investor_id)
if kind == "fund":
where.append("EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id "
"AND (l.fundraising_investor_id = ? OR l.fundraising_contact_id IN "
"(SELECT id FROM fundraising_contacts WHERE investor_id = ?)))")
params.extend([val, val])
elif kind == "org":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND l.organization_id = ?)")
params.append(val)
elif kind == "contact":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND l.contact_id = ?)")
params.append(val)
elif kind == "addr":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND LOWER(l.matched_address) = ?)")
params.append(val.lower())
else:
# Unknown key prefix (malformed input) -> match nothing, never silently
# fall through to an unfiltered list.
where.append("1 = 0")
if search:
like = f"%{search.strip()}%"
where.append("(e.subject LIKE ? OR e.snippet LIKE ? "
"OR e.from_email LIKE ? OR e.from_name LIKE ?)")
params.extend([like, like, like, like])
# Date range over the send time (ISO-8601 strings sort lexically). [since, until).
if since:
where.append("e.sent_at >= ?")
params.append(since)
if until:
where.append("e.sent_at < ?")
params.append(until)
direction = (direction or "").strip().lower()
if direction in ("inbound", "outbound") and own:
marks = ",".join("?" for _ in own)
op = "IN" if direction == "outbound" else "NOT IN"
where.append(f"LOWER(e.from_email) {op} ({marks})")
params.extend(sorted(own))
sql = ("SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, "
"e.has_attachments, e.is_matched, e.match_status FROM emails e WHERE "
+ " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?")
rows = [dict(r) for r in cur.execute(sql, params + [limit + 1])]
truncated = len(rows) > limit
rows = rows[:limit]
by_id = {r["id"]: r for r in rows}
for r in rows:
r["direction"] = "outbound" if (r["from_email"] or "").lower().strip() in own else "inbound"
r["mailboxes"] = []
r["investors"] = [] # [{id: typed-key, name}] — resolved identities
ids = list(by_id)
if ids:
marks = ",".join("?" for _ in ids)
for s in cur.execute(
"SELECT eam.email_id AS eid, ea.email_address AS addr "
"FROM email_account_messages eam JOIN email_accounts ea ON ea.id = eam.account_id "
f"WHERE eam.deleted_at IS NULL AND eam.email_id IN ({marks}) "
"ORDER BY ea.email_address", ids):
mb = by_id[s["eid"]]["mailboxes"]
if s["addr"] and s["addr"] not in mb:
mb.append(s["addr"])
for lnk in cur.execute(
f"SELECT l.email_id AS eid, {_LINK_IDENTITY_COLS} "
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
f"WHERE l.email_id IN ({marks})", ids):
# No graveyard filter here on purpose: a graveyarded investor's *email*
# still shows in the list with its chip (audit completeness, direct or
# via-contact); only the facet dropdown below hides graveyard from the picker.
key, name = _resolve_entity(lnk)
if not key:
continue
invs = by_id[lnk["eid"]]["investors"]
if not any(iv["id"] == key for iv in invs):
invs.append({"id": key, "name": name})
accounts = [dict(r) for r in cur.execute(
"SELECT id, email_address FROM email_accounts ORDER BY email_address")]
# Facet dropdown mirrors what the list resolves: one entry per distinct matched
# entity (grid investor / org / contact), across all live matched email — not just
# the current page — so the picker is stable under filtering. Excluded from the
# picker: graveyarded grid investors (CRM-wide convention) and raw-address-only
# matches (too many, too noisy). Both still appear in the list and remain findable
# by free-text search — this is an audit surface, so history is never hidden, only
# the picker is.
facet: dict[str, str] = {}
for r in cur.execute(
f"SELECT DISTINCT {_LINK_IDENTITY_COLS} "
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
"WHERE EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = l.email_id AND eam.deleted_at IS NULL)"):
key, name = _resolve_entity(r)
if not key or key.startswith("addr:"):
continue
if key.startswith("fund:") and (r["fund_graveyard"] or 0):
continue
facet.setdefault(key, name)
investors = [{"id": k, "name": v}
for k, v in sorted(facet.items(), key=lambda kv: kv[1].lower())]
return {"emails": rows, "accounts": accounts, "investors": investors,
"count": len(rows), "truncated": truncated}
def search_hit_emails(conn: sqlite3.Connection, email_ids) -> dict:
"""Display fields for the given email ids that are still live (have a
non-tombstoned sighting), keyed by id, with email-level direction.
Used to hydrate + soft-delete-filter semantic-search hits: the Qdrant index can
lag a deletion, and SQLite is canonical (never trust the derived index), so a hit
whose email no longer has a live sighting is dropped here rather than shown."""
ids = [i for i in dict.fromkeys(email_ids) if i]
if not ids:
return {}
cur = conn.cursor()
own = {(r["email_address"] or "").lower().strip()
for r in cur.execute("SELECT email_address FROM email_accounts")}
own.discard("")
marks = ",".join("?" for _ in ids)
out: dict = {}
for e in cur.execute(
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.has_attachments "
f"FROM emails e WHERE e.id IN ({marks}) AND EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", ids):
d = dict(e)
d["direction"] = "outbound" if (d["from_email"] or "").lower().strip() in own else "inbound"
out[d["id"]] = d
return out
def query_email_detail(conn: sqlite3.Connection, email_id: str) -> Optional[dict]:
"""Full record for one captured email — the Communications detail view (full
body + recipients + matched investor identities + mailboxes + attachments).
Returns None if the email doesn't exist or has no live (non-tombstoned) sighting:
soft-delete lives on the per-mailbox `email_account_messages` row, not on `emails`,
so an email is only "live" while at least one sighting survives. Direction is set
at the email level (outbound if the sender is one of our mailboxes), matching the
list. The raw remote `body_html` is NOT returned (XSS); the response carries the
plain-text `body_text` plus a `has_html` flag so the UI can note an HTML-only email."""
cur = conn.cursor()
e = cur.execute(
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, "
"e.body_text, e.body_html, e.has_attachments, e.match_status, e.gmail_thread_id "
"FROM emails e WHERE e.id = ? AND EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", (email_id,)).fetchone()
if not e:
return None
row = dict(e)
# Don't ship the raw remote HTML to the client (XSS if any consumer ever renders
# it); the UI shows the plain-text body and only needs to know HTML exists.
row["has_html"] = bool((row.pop("body_html", None) or "").strip())
own = {(r["email_address"] or "").lower().strip()
for r in cur.execute("SELECT email_address FROM email_accounts")}
own.discard("")
row["direction"] = "outbound" if (row["from_email"] or "").lower().strip() in own else "inbound"
row["mailboxes"] = [r["addr"] for r in cur.execute(
"SELECT DISTINCT ea.email_address AS addr FROM email_account_messages eam "
"JOIN email_accounts ea ON ea.id = eam.account_id "
"WHERE eam.email_id = ? AND eam.deleted_at IS NULL ORDER BY ea.email_address", (email_id,))]
row["recipients"] = [dict(r) for r in cur.execute(
"SELECT address, display_name, kind FROM email_recipients "
"WHERE email_id = ? AND kind IN ('to','cc') "
"ORDER BY CASE kind WHEN 'to' THEN 0 ELSE 1 END, address", (email_id,))]
row["attachments"] = [dict(r) for r in cur.execute(
"SELECT filename, mime_type, size_bytes, download_status FROM email_attachments "
"WHERE email_id = ? ORDER BY filename", (email_id,))]
investors: dict[str, str] = {}
for lnk in cur.execute(
f"SELECT {_LINK_IDENTITY_COLS} FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
"WHERE l.email_id = ?", (email_id,)):
key, name = _resolve_entity(lnk)
if key:
investors.setdefault(key, name)
row["investors"] = [{"id": k, "name": v} for k, v in investors.items()]
return row
def finish_sync_run(conn: sqlite3.Connection, run_id: str, *, status: str,
stats: Optional[dict] = None, error: Optional[str] = None) -> None:
stats = stats or {}
conn.execute(
"""UPDATE email_sync_runs
SET finished_at = ?, status = ?, messages_seen = ?, messages_stored = ?,
attachments_saved = ?, api_calls = ?, retries = ?, error = ?
WHERE id = ?""",
(
_now_iso(), status,
int(stats.get("messages_seen", 0)),
int(stats.get("messages_stored", 0)),
int(stats.get("attachments_saved", 0)),
int(stats.get("api_calls", 0)),
int(stats.get("retries", 0)),
error,
run_id,
),
)