c7b74a2704
Communications tab (search/query roadmap items 1 & 2): - Fix the investor dropdown: the facet only listed grid investors, so it came back empty whenever email matched a classic contact or org domain (no grid id — the common case). It now mirrors the email list, resolving each link to a typed identity (fund:/org:/contact:/addr:) with precedence grid -> org -> contact -> address; investor_id accepts the typed key (bare id = fund: for back-compat) and an unknown prefix matches nothing. - Add a date-range filter and a click-to-expand full-body view (GET /api/email/detail, admin, soft-delete-gated; body_text only, never raw remote HTML). - Add a "Search content" mode: GET /api/email/search wraps the ingest hybrid_search over the Qdrant email index (doc_type=email), hydrated and soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down. Daily digest: - Settings -> Admin builds a digest over a chosen window (last 24h or since a date) as an in-app preview before sending (POST /api/admin/digest/preview), so the local-Spark summarizer can be verified on demand even on a quiet day. Manual send uses the same window; neither advances the daily cursor, so a preview never suppresses the scheduled digest. Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
689 lines
29 KiB
Python
689 lines
29 KiB
Python
"""
|
|
Data-access layer for the email_integration module.
|
|
|
|
All SQL touching emails_* tables lives here. Other modules call named
|
|
helpers — they never write SQL inline. This keeps schema changes contained.
|
|
|
|
Connection pattern matches server.py get_db():
|
|
- WAL mode, foreign keys on, busy_timeout
|
|
- sqlite3.Row row_factory
|
|
The caller is responsible for committing / closing.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable, Optional
|
|
|
|
|
|
# ------------------------------------------------------------------ migrations
|
|
|
|
def apply_migrations(cursor: sqlite3.Cursor) -> None:
|
|
"""Apply all .sql migration files in migrations/ in lexicographic order.
|
|
|
|
Called from server.init_db(). Idempotent. Does not log past migrations in
|
|
a table yet — each file is guarded by CREATE ... IF NOT EXISTS etc. If
|
|
we ever need more complex migrations, add a schema_migrations table.
|
|
"""
|
|
here = os.path.dirname(os.path.abspath(__file__))
|
|
mdir = os.path.join(here, "migrations")
|
|
if not os.path.isdir(mdir):
|
|
return
|
|
for name in sorted(os.listdir(mdir)):
|
|
if not name.endswith(".sql"):
|
|
continue
|
|
path = os.path.join(mdir, name)
|
|
with open(path, "r") as f:
|
|
sql = f.read()
|
|
cursor.executescript(sql)
|
|
|
|
|
|
# ------------------------------------------------------------------ utils
|
|
|
|
def _uuid() -> str:
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def _json(v) -> str:
|
|
return json.dumps(v, separators=(",", ":"))
|
|
|
|
|
|
# ------------------------------------------------------------------ email_accounts
|
|
|
|
def list_sync_ready_accounts(conn: sqlite3.Connection) -> list[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT * FROM email_accounts "
|
|
"WHERE sync_enabled = 1 AND sync_status IN ('pending','active') "
|
|
"ORDER BY last_synced_at IS NOT NULL, last_synced_at"
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
def get_account_by_email(conn: sqlite3.Connection, email_address: str) -> Optional[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT * FROM email_accounts WHERE email_address = ?", (email_address,))
|
|
return cur.fetchone()
|
|
|
|
|
|
def upsert_account(conn: sqlite3.Connection, *, user_id: str, email_address: str,
|
|
auth_method: str) -> str:
|
|
existing = get_account_by_email(conn, email_address)
|
|
if existing:
|
|
return existing["id"]
|
|
account_id = _uuid()
|
|
conn.execute(
|
|
"INSERT INTO email_accounts (id, user_id, email_address, auth_method) "
|
|
"VALUES (?, ?, ?, ?)",
|
|
(account_id, user_id, email_address, auth_method),
|
|
)
|
|
return account_id
|
|
|
|
|
|
def set_account_status(conn: sqlite3.Connection, account_id: str, *,
|
|
status: str, error: Optional[str] = None) -> None:
|
|
conn.execute(
|
|
"UPDATE email_accounts SET sync_status = ?, sync_error = ?, "
|
|
"updated_at = datetime('now') WHERE id = ?",
|
|
(status, error, account_id),
|
|
)
|
|
|
|
|
|
def set_account_checkpoint(conn: sqlite3.Connection, account_id: str, *,
|
|
history_id: Optional[str] = None,
|
|
backfill_cursor: Optional[str] = None,
|
|
backfill_complete: Optional[bool] = None,
|
|
last_synced_at: Optional[str] = None) -> None:
|
|
sets, params = [], []
|
|
if history_id is not None:
|
|
sets.append("last_history_id = ?"); params.append(history_id)
|
|
if backfill_cursor is not None:
|
|
sets.append("backfill_cursor = ?"); params.append(backfill_cursor)
|
|
if backfill_complete is not None:
|
|
sets.append("backfill_complete = ?"); params.append(1 if backfill_complete else 0)
|
|
if last_synced_at is not None:
|
|
sets.append("last_synced_at = ?"); params.append(last_synced_at)
|
|
if not sets:
|
|
return
|
|
sets.append("updated_at = datetime('now')")
|
|
params.append(account_id)
|
|
conn.execute(f"UPDATE email_accounts SET {', '.join(sets)} WHERE id = ?", params)
|
|
|
|
|
|
# ------------------------------------------------------------------ emails
|
|
|
|
def find_email_by_rfc_id(conn: sqlite3.Connection, rfc_message_id: str) -> Optional[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT * FROM emails WHERE rfc_message_id = ?", (rfc_message_id,))
|
|
return cur.fetchone()
|
|
|
|
|
|
def find_email_id_by_any_rfc_id(conn: sqlite3.Connection,
|
|
rfc_ids: Iterable[str]) -> Optional[str]:
|
|
ids = [r for r in rfc_ids if r]
|
|
if not ids:
|
|
return None
|
|
placeholders = ",".join("?" for _ in ids)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"SELECT id FROM emails WHERE rfc_message_id IN ({placeholders}) "
|
|
"ORDER BY sent_at ASC LIMIT 1",
|
|
ids,
|
|
)
|
|
row = cur.fetchone()
|
|
return row["id"] if row else None
|
|
|
|
|
|
def insert_email(conn: sqlite3.Connection, *, parsed: dict, match_status: str) -> str:
|
|
"""Insert a fresh emails row. Returns email_id.
|
|
|
|
Caller must ensure no row exists for parsed['rfc_message_id']; use
|
|
find_email_by_rfc_id first.
|
|
"""
|
|
email_id = _uuid()
|
|
conn.execute(
|
|
"""INSERT INTO emails
|
|
(id, rfc_message_id, gmail_thread_id, rfc_thread_root_id, subject,
|
|
from_email, from_name, to_emails_json, cc_emails_json, bcc_emails_json,
|
|
reply_to, sent_at, body_text, body_html, snippet, in_reply_to,
|
|
references_json, has_attachments, size_estimate, is_matched,
|
|
match_status, raw_headers_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
email_id,
|
|
parsed["rfc_message_id"],
|
|
parsed.get("gmail_thread_id"),
|
|
parsed.get("rfc_thread_root_id"),
|
|
parsed.get("subject"),
|
|
parsed["from_email"],
|
|
parsed.get("from_name"),
|
|
_json(parsed.get("to", [])),
|
|
_json(parsed.get("cc", [])),
|
|
_json(parsed.get("bcc", [])),
|
|
parsed.get("reply_to"),
|
|
parsed["sent_at"],
|
|
parsed.get("body_text"),
|
|
parsed.get("body_html"),
|
|
parsed.get("snippet"),
|
|
parsed.get("in_reply_to"),
|
|
_json(parsed.get("references", [])),
|
|
1 if parsed.get("attachments") else 0,
|
|
parsed.get("size_estimate"),
|
|
1 if match_status == "matched" else 0,
|
|
match_status,
|
|
_json(parsed.get("raw_headers", {})) if parsed.get("raw_headers") else None,
|
|
),
|
|
)
|
|
# recipients
|
|
for kind in ("from", "to", "cc", "bcc", "reply_to"):
|
|
addrs = []
|
|
if kind == "from" and parsed.get("from_email"):
|
|
addrs = [(parsed["from_email"], parsed.get("from_name"))]
|
|
elif kind == "reply_to" and parsed.get("reply_to"):
|
|
addrs = [(parsed["reply_to"], None)]
|
|
else:
|
|
# `or []` (not get(kind, [])): the key is often present with value None
|
|
# (e.g. reply_to when there is no Reply-To header), and `for a in None`
|
|
# would raise TypeError and abort the whole backfill.
|
|
for a in (parsed.get(kind) or []):
|
|
if isinstance(a, dict):
|
|
addrs.append((a.get("email"), a.get("name")))
|
|
else:
|
|
addrs.append((a, None))
|
|
for address, name in addrs:
|
|
if not address:
|
|
continue
|
|
conn.execute(
|
|
"INSERT INTO email_recipients (id, email_id, address, display_name, kind) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(_uuid(), email_id, address.lower().strip(), name, kind),
|
|
)
|
|
return email_id
|
|
|
|
|
|
def set_email_thread(conn: sqlite3.Connection, email_id: str, thread_id: str) -> None:
|
|
conn.execute(
|
|
"UPDATE emails SET thread_id = ?, updated_at = datetime('now') WHERE id = ?",
|
|
(thread_id, email_id),
|
|
)
|
|
|
|
|
|
# ------------------------------------------------------------------ sightings
|
|
|
|
def upsert_sighting(conn: sqlite3.Connection, *, email_id: str, account_id: str,
|
|
gmail_message_id: str, gmail_thread_id: str,
|
|
labels: list[str], is_sent: bool) -> None:
|
|
conn.execute(
|
|
"""INSERT OR IGNORE INTO email_account_messages
|
|
(id, email_id, account_id, gmail_message_id, gmail_thread_id,
|
|
labels_json, is_sent)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(_uuid(), email_id, account_id, gmail_message_id, gmail_thread_id,
|
|
_json(labels), 1 if is_sent else 0),
|
|
)
|
|
|
|
|
|
def update_sighting_labels(conn: sqlite3.Connection, *, account_id: str,
|
|
gmail_message_id: str, labels: list[str]) -> None:
|
|
conn.execute(
|
|
"UPDATE email_account_messages SET labels_json = ? "
|
|
"WHERE account_id = ? AND gmail_message_id = ?",
|
|
(_json(labels), account_id, gmail_message_id),
|
|
)
|
|
|
|
|
|
def tombstone_sighting(conn: sqlite3.Connection, *, account_id: str,
|
|
gmail_message_id: str) -> None:
|
|
conn.execute(
|
|
"UPDATE email_account_messages SET deleted_at = datetime('now') "
|
|
"WHERE account_id = ? AND gmail_message_id = ?",
|
|
(account_id, gmail_message_id),
|
|
)
|
|
|
|
|
|
# ------------------------------------------------------------------ attachments
|
|
|
|
def insert_attachment_stub(conn: sqlite3.Connection, *, email_id: str,
|
|
gmail_attachment_id: str, filename: str,
|
|
sanitized_filename: str, mime_type: Optional[str],
|
|
size_bytes: Optional[int], storage_path: str) -> str:
|
|
att_id = _uuid()
|
|
conn.execute(
|
|
"""INSERT INTO email_attachments
|
|
(id, email_id, gmail_attachment_id, filename, sanitized_filename,
|
|
mime_type, size_bytes, storage_path)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(att_id, email_id, gmail_attachment_id, filename, sanitized_filename,
|
|
mime_type, size_bytes, storage_path),
|
|
)
|
|
return att_id
|
|
|
|
|
|
def mark_attachment_downloaded(conn: sqlite3.Connection, attachment_id: str, *,
|
|
sha256_hex: str, size_bytes: int) -> None:
|
|
conn.execute(
|
|
"UPDATE email_attachments SET download_status = 'downloaded', "
|
|
"sha256_hex = ?, size_bytes = ?, downloaded_at = datetime('now') "
|
|
"WHERE id = ?",
|
|
(sha256_hex, size_bytes, attachment_id),
|
|
)
|
|
|
|
|
|
def mark_attachment_failed(conn: sqlite3.Connection, attachment_id: str, *,
|
|
error: str) -> None:
|
|
conn.execute(
|
|
"UPDATE email_attachments SET download_status = 'failed', "
|
|
"download_attempts = download_attempts + 1, download_error = ? "
|
|
"WHERE id = ?",
|
|
(error, attachment_id),
|
|
)
|
|
|
|
|
|
def pending_attachments(conn: sqlite3.Connection, limit: int = 50) -> list[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT a.*, eam.gmail_message_id, eam.account_id "
|
|
"FROM email_attachments a "
|
|
"JOIN email_account_messages eam ON eam.email_id = a.email_id "
|
|
"WHERE a.download_status = 'pending' AND a.download_attempts < 5 "
|
|
"LIMIT ?",
|
|
(limit,),
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
# ------------------------------------------------------------------ threads
|
|
|
|
def find_thread_by_gmail_id(conn: sqlite3.Connection, gmail_thread_id: str) -> Optional[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT * FROM email_threads WHERE gmail_thread_id = ?",
|
|
(gmail_thread_id,),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def find_thread_by_rfc_root(conn: sqlite3.Connection, rfc_root: str) -> Optional[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT * FROM email_threads WHERE rfc_thread_root_id = ?",
|
|
(rfc_root,),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def create_thread(conn: sqlite3.Connection, *, gmail_thread_id: Optional[str],
|
|
rfc_thread_root_id: Optional[str], subject_normalized: Optional[str],
|
|
first_message_at: Optional[str]) -> str:
|
|
thread_id = _uuid()
|
|
conn.execute(
|
|
"""INSERT INTO email_threads
|
|
(id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
|
|
first_message_at, last_message_at, message_count)
|
|
VALUES (?, ?, ?, ?, ?, ?, 0)""",
|
|
(thread_id, gmail_thread_id, rfc_thread_root_id, subject_normalized,
|
|
first_message_at, first_message_at),
|
|
)
|
|
return thread_id
|
|
|
|
|
|
def rollup_thread(conn: sqlite3.Connection, thread_id: str) -> None:
|
|
"""Recompute count / last_message_at / participants from member emails.
|
|
|
|
Cheap at 5-person team volumes. For larger deployments swap to triggers.
|
|
"""
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT COUNT(*) AS n, MIN(sent_at) AS first, MAX(sent_at) AS last, "
|
|
"MAX(is_matched) AS matched FROM emails WHERE thread_id = ?",
|
|
(thread_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row or row["n"] == 0:
|
|
return
|
|
cur.execute(
|
|
"SELECT DISTINCT address FROM email_recipients er "
|
|
"JOIN emails e ON e.id = er.email_id WHERE e.thread_id = ?",
|
|
(thread_id,),
|
|
)
|
|
participants = [r["address"] for r in cur.fetchall()]
|
|
conn.execute(
|
|
"UPDATE email_threads SET message_count = ?, first_message_at = ?, "
|
|
"last_message_at = ?, participant_count = ?, participants_json = ?, "
|
|
"is_matched = ?, updated_at = datetime('now') WHERE id = ?",
|
|
(row["n"], row["first"], row["last"], len(participants),
|
|
_json(participants), int(row["matched"] or 0), thread_id),
|
|
)
|
|
|
|
|
|
# ------------------------------------------------------------------ investor links
|
|
|
|
def insert_investor_link(conn: sqlite3.Connection, *, email_id: str,
|
|
link: dict) -> None:
|
|
conn.execute(
|
|
"""INSERT INTO email_investor_links
|
|
(id, email_id, fundraising_investor_id, fundraising_contact_id,
|
|
contact_id, organization_id, matched_address, match_kind,
|
|
match_confidence)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
_uuid(),
|
|
email_id,
|
|
link.get("fundraising_investor_id"),
|
|
link.get("fundraising_contact_id"),
|
|
link.get("contact_id"),
|
|
link.get("organization_id"),
|
|
link["matched_address"],
|
|
link["match_kind"],
|
|
float(link.get("match_confidence", 1.0)),
|
|
),
|
|
)
|
|
|
|
|
|
# ------------------------------------------------------------------ sync runs
|
|
|
|
def start_sync_run(conn: sqlite3.Connection, *, account_id: str, kind: str) -> str:
|
|
run_id = _uuid()
|
|
conn.execute(
|
|
"INSERT INTO email_sync_runs (id, account_id, kind, started_at, status) "
|
|
"VALUES (?, ?, ?, ?, 'running')",
|
|
(run_id, account_id, kind, _now_iso()),
|
|
)
|
|
return run_id
|
|
|
|
|
|
def _resolve_entity(row) -> tuple:
|
|
"""Reduce one email_investor_links hydration row to a (key, name) identity for
|
|
the matched investor, with the same precedence the digest uses:
|
|
grid investor -> organization -> contact -> raw matched address. The key is
|
|
*typed* (`fund:`/`org:`/`contact:`/`addr:`) so the Communications filter can
|
|
target the right column. Soft-deleted org/contact rows arrive as NULL (filtered
|
|
in the join) and fall through to the next tier."""
|
|
if row["fund_id"] and (row["fund_name"] or "").strip():
|
|
return f"fund:{row['fund_id']}", row["fund_name"].strip()
|
|
if row["org_id"] and (row["org_name"] or "").strip():
|
|
return f"org:{row['org_id']}", row["org_name"].strip()
|
|
if row["contact_id"] and (row["contact_name"] or "").strip():
|
|
return f"contact:{row['contact_id']}", row["contact_name"].strip()
|
|
addr = (row["addr"] or "").strip()
|
|
return (f"addr:{addr.lower()}", addr) if addr else (None, None)
|
|
|
|
|
|
# Hydration of an email_investor_links row up to the resolvable investor identity,
|
|
# shared by the per-email tags and the facet dropdown. Soft-deleted org/contact
|
|
# rows are dropped in the join so they fall through to the next identity tier.
|
|
_LINK_IDENTITY_JOINS = """
|
|
LEFT JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id
|
|
LEFT JOIN fundraising_contacts fic ON fic.id = l.fundraising_contact_id
|
|
LEFT JOIN fundraising_investors fic_inv ON fic_inv.id = fic.investor_id
|
|
LEFT JOIN organizations o ON o.id = l.organization_id AND o.deleted_at IS NULL
|
|
LEFT JOIN contacts c ON c.id = l.contact_id AND c.deleted_at IS NULL
|
|
"""
|
|
_LINK_IDENTITY_COLS = """
|
|
l.matched_address AS addr,
|
|
COALESCE(fi.id, fic_inv.id) AS fund_id,
|
|
COALESCE(fi.investor_name, fic_inv.investor_name) AS fund_name,
|
|
COALESCE(fi.graveyard, fic_inv.graveyard) AS fund_graveyard,
|
|
o.id AS org_id, o.name AS org_name,
|
|
c.id AS contact_id,
|
|
NULLIF(TRIM(COALESCE(c.first_name,'') || ' ' || COALESCE(c.last_name,'')), '') AS contact_name
|
|
"""
|
|
|
|
|
|
def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str] = None,
|
|
account_id: Optional[str] = None, search: Optional[str] = None,
|
|
direction: Optional[str] = None, since: Optional[str] = None,
|
|
until: Optional[str] = None, limit: int = 100) -> dict:
|
|
"""Captured-Gmail activity for the admin Communications panel, filterable by
|
|
matched investor entity, mailbox, direction and date range, with free-text
|
|
search over subject/snippet/sender. Returns the email rows plus the filter facets.
|
|
|
|
Matched-only: the panel shows ONLY email that links to a known investor/contact
|
|
(an `email_investor_links` row exists). Unmatched cold/unknown-sender email is
|
|
still captured for completeness but is never surfaced here.
|
|
|
|
Investor identity is *typed* (`fund:`/`org:`/`contact:`/`addr:`) and resolved with
|
|
the digest's precedence (grid investor -> organization -> contact -> raw address),
|
|
so an email matched only to a classic contact or an org domain — not yet wired to a
|
|
grid investor — still shows a real name and is selectable in the dropdown, instead
|
|
of the facet coming back empty. `investor_id` accepts a typed key (a bare id is
|
|
treated as `fund:` for backward compatibility).
|
|
|
|
Soft-delete: an email is live only if it still has a non-tombstoned per-mailbox
|
|
sighting (`email_account_messages.deleted_at IS NULL`) — the `emails` row itself
|
|
carries no deleted_at, so deletion lives on the sighting. Direction is decided at
|
|
the email level (outbound if the sender is one of our mailboxes), mirroring the
|
|
digest builder, so a thread reads consistently regardless of which mailbox saw it.
|
|
"""
|
|
limit = max(1, min(int(limit or 100), 500))
|
|
cur = conn.cursor()
|
|
own = {(r["email_address"] or "").lower().strip()
|
|
for r in cur.execute("SELECT email_address FROM email_accounts")}
|
|
own.discard("")
|
|
|
|
where = ["EXISTS (SELECT 1 FROM email_account_messages eam "
|
|
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)",
|
|
# Matched-only: surface email that links to a known investor/contact.
|
|
# Unmatched (unknown-sender) email is captured but never shown here.
|
|
"EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"]
|
|
params: list = []
|
|
if account_id:
|
|
where.append("EXISTS (SELECT 1 FROM email_account_messages eam "
|
|
"WHERE eam.email_id = e.id AND eam.account_id = ? "
|
|
"AND eam.deleted_at IS NULL)")
|
|
params.append(account_id)
|
|
if investor_id:
|
|
kind, _, val = str(investor_id).partition(":")
|
|
if not val: # bare id (legacy) -> grid investor
|
|
kind, val = "fund", str(investor_id)
|
|
if kind == "fund":
|
|
where.append("EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id "
|
|
"AND (l.fundraising_investor_id = ? OR l.fundraising_contact_id IN "
|
|
"(SELECT id FROM fundraising_contacts WHERE investor_id = ?)))")
|
|
params.extend([val, val])
|
|
elif kind == "org":
|
|
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
|
|
"WHERE l.email_id = e.id AND l.organization_id = ?)")
|
|
params.append(val)
|
|
elif kind == "contact":
|
|
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
|
|
"WHERE l.email_id = e.id AND l.contact_id = ?)")
|
|
params.append(val)
|
|
elif kind == "addr":
|
|
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
|
|
"WHERE l.email_id = e.id AND LOWER(l.matched_address) = ?)")
|
|
params.append(val.lower())
|
|
else:
|
|
# Unknown key prefix (malformed input) -> match nothing, never silently
|
|
# fall through to an unfiltered list.
|
|
where.append("1 = 0")
|
|
if search:
|
|
like = f"%{search.strip()}%"
|
|
where.append("(e.subject LIKE ? OR e.snippet LIKE ? "
|
|
"OR e.from_email LIKE ? OR e.from_name LIKE ?)")
|
|
params.extend([like, like, like, like])
|
|
# Date range over the send time (ISO-8601 strings sort lexically). [since, until).
|
|
if since:
|
|
where.append("e.sent_at >= ?")
|
|
params.append(since)
|
|
if until:
|
|
where.append("e.sent_at < ?")
|
|
params.append(until)
|
|
direction = (direction or "").strip().lower()
|
|
if direction in ("inbound", "outbound") and own:
|
|
marks = ",".join("?" for _ in own)
|
|
op = "IN" if direction == "outbound" else "NOT IN"
|
|
where.append(f"LOWER(e.from_email) {op} ({marks})")
|
|
params.extend(sorted(own))
|
|
|
|
sql = ("SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, "
|
|
"e.has_attachments, e.is_matched, e.match_status FROM emails e WHERE "
|
|
+ " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?")
|
|
rows = [dict(r) for r in cur.execute(sql, params + [limit + 1])]
|
|
truncated = len(rows) > limit
|
|
rows = rows[:limit]
|
|
by_id = {r["id"]: r for r in rows}
|
|
for r in rows:
|
|
r["direction"] = "outbound" if (r["from_email"] or "").lower().strip() in own else "inbound"
|
|
r["mailboxes"] = []
|
|
r["investors"] = [] # [{id: typed-key, name}] — resolved identities
|
|
|
|
ids = list(by_id)
|
|
if ids:
|
|
marks = ",".join("?" for _ in ids)
|
|
for s in cur.execute(
|
|
"SELECT eam.email_id AS eid, ea.email_address AS addr "
|
|
"FROM email_account_messages eam JOIN email_accounts ea ON ea.id = eam.account_id "
|
|
f"WHERE eam.deleted_at IS NULL AND eam.email_id IN ({marks}) "
|
|
"ORDER BY ea.email_address", ids):
|
|
mb = by_id[s["eid"]]["mailboxes"]
|
|
if s["addr"] and s["addr"] not in mb:
|
|
mb.append(s["addr"])
|
|
for lnk in cur.execute(
|
|
f"SELECT l.email_id AS eid, {_LINK_IDENTITY_COLS} "
|
|
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
|
|
f"WHERE l.email_id IN ({marks})", ids):
|
|
# No graveyard filter here on purpose: a graveyarded investor's *email*
|
|
# still shows in the list with its chip (audit completeness, direct or
|
|
# via-contact); only the facet dropdown below hides graveyard from the picker.
|
|
key, name = _resolve_entity(lnk)
|
|
if not key:
|
|
continue
|
|
invs = by_id[lnk["eid"]]["investors"]
|
|
if not any(iv["id"] == key for iv in invs):
|
|
invs.append({"id": key, "name": name})
|
|
|
|
accounts = [dict(r) for r in cur.execute(
|
|
"SELECT id, email_address FROM email_accounts ORDER BY email_address")]
|
|
|
|
# Facet dropdown mirrors what the list resolves: one entry per distinct matched
|
|
# entity (grid investor / org / contact), across all live matched email — not just
|
|
# the current page — so the picker is stable under filtering. Excluded from the
|
|
# picker: graveyarded grid investors (CRM-wide convention) and raw-address-only
|
|
# matches (too many, too noisy). Both still appear in the list and remain findable
|
|
# by free-text search — this is an audit surface, so history is never hidden, only
|
|
# the picker is.
|
|
facet: dict[str, str] = {}
|
|
for r in cur.execute(
|
|
f"SELECT DISTINCT {_LINK_IDENTITY_COLS} "
|
|
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
|
|
"WHERE EXISTS (SELECT 1 FROM email_account_messages eam "
|
|
"WHERE eam.email_id = l.email_id AND eam.deleted_at IS NULL)"):
|
|
key, name = _resolve_entity(r)
|
|
if not key or key.startswith("addr:"):
|
|
continue
|
|
if key.startswith("fund:") and (r["fund_graveyard"] or 0):
|
|
continue
|
|
facet.setdefault(key, name)
|
|
investors = [{"id": k, "name": v}
|
|
for k, v in sorted(facet.items(), key=lambda kv: kv[1].lower())]
|
|
|
|
return {"emails": rows, "accounts": accounts, "investors": investors,
|
|
"count": len(rows), "truncated": truncated}
|
|
|
|
|
|
def search_hit_emails(conn: sqlite3.Connection, email_ids) -> dict:
|
|
"""Display fields for the given email ids that are still live (have a
|
|
non-tombstoned sighting), keyed by id, with email-level direction.
|
|
|
|
Used to hydrate + soft-delete-filter semantic-search hits: the Qdrant index can
|
|
lag a deletion, and SQLite is canonical (never trust the derived index), so a hit
|
|
whose email no longer has a live sighting is dropped here rather than shown."""
|
|
ids = [i for i in dict.fromkeys(email_ids) if i]
|
|
if not ids:
|
|
return {}
|
|
cur = conn.cursor()
|
|
own = {(r["email_address"] or "").lower().strip()
|
|
for r in cur.execute("SELECT email_address FROM email_accounts")}
|
|
own.discard("")
|
|
marks = ",".join("?" for _ in ids)
|
|
out: dict = {}
|
|
for e in cur.execute(
|
|
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.has_attachments "
|
|
f"FROM emails e WHERE e.id IN ({marks}) AND EXISTS (SELECT 1 FROM email_account_messages eam "
|
|
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", ids):
|
|
d = dict(e)
|
|
d["direction"] = "outbound" if (d["from_email"] or "").lower().strip() in own else "inbound"
|
|
out[d["id"]] = d
|
|
return out
|
|
|
|
|
|
def query_email_detail(conn: sqlite3.Connection, email_id: str) -> Optional[dict]:
|
|
"""Full record for one captured email — the Communications detail view (full
|
|
body + recipients + matched investor identities + mailboxes + attachments).
|
|
|
|
Returns None if the email doesn't exist or has no live (non-tombstoned) sighting:
|
|
soft-delete lives on the per-mailbox `email_account_messages` row, not on `emails`,
|
|
so an email is only "live" while at least one sighting survives. Direction is set
|
|
at the email level (outbound if the sender is one of our mailboxes), matching the
|
|
list. The raw remote `body_html` is NOT returned (XSS); the response carries the
|
|
plain-text `body_text` plus a `has_html` flag so the UI can note an HTML-only email."""
|
|
cur = conn.cursor()
|
|
e = cur.execute(
|
|
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, "
|
|
"e.body_text, e.body_html, e.has_attachments, e.match_status, e.gmail_thread_id "
|
|
"FROM emails e WHERE e.id = ? AND EXISTS (SELECT 1 FROM email_account_messages eam "
|
|
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", (email_id,)).fetchone()
|
|
if not e:
|
|
return None
|
|
row = dict(e)
|
|
# Don't ship the raw remote HTML to the client (XSS if any consumer ever renders
|
|
# it); the UI shows the plain-text body and only needs to know HTML exists.
|
|
row["has_html"] = bool((row.pop("body_html", None) or "").strip())
|
|
|
|
own = {(r["email_address"] or "").lower().strip()
|
|
for r in cur.execute("SELECT email_address FROM email_accounts")}
|
|
own.discard("")
|
|
row["direction"] = "outbound" if (row["from_email"] or "").lower().strip() in own else "inbound"
|
|
|
|
row["mailboxes"] = [r["addr"] for r in cur.execute(
|
|
"SELECT DISTINCT ea.email_address AS addr FROM email_account_messages eam "
|
|
"JOIN email_accounts ea ON ea.id = eam.account_id "
|
|
"WHERE eam.email_id = ? AND eam.deleted_at IS NULL ORDER BY ea.email_address", (email_id,))]
|
|
|
|
row["recipients"] = [dict(r) for r in cur.execute(
|
|
"SELECT address, display_name, kind FROM email_recipients "
|
|
"WHERE email_id = ? AND kind IN ('to','cc') "
|
|
"ORDER BY CASE kind WHEN 'to' THEN 0 ELSE 1 END, address", (email_id,))]
|
|
|
|
row["attachments"] = [dict(r) for r in cur.execute(
|
|
"SELECT filename, mime_type, size_bytes, download_status FROM email_attachments "
|
|
"WHERE email_id = ? ORDER BY filename", (email_id,))]
|
|
|
|
investors: dict[str, str] = {}
|
|
for lnk in cur.execute(
|
|
f"SELECT {_LINK_IDENTITY_COLS} FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
|
|
"WHERE l.email_id = ?", (email_id,)):
|
|
key, name = _resolve_entity(lnk)
|
|
if key:
|
|
investors.setdefault(key, name)
|
|
row["investors"] = [{"id": k, "name": v} for k, v in investors.items()]
|
|
return row
|
|
|
|
|
|
def finish_sync_run(conn: sqlite3.Connection, run_id: str, *, status: str,
|
|
stats: Optional[dict] = None, error: Optional[str] = None) -> None:
|
|
stats = stats or {}
|
|
conn.execute(
|
|
"""UPDATE email_sync_runs
|
|
SET finished_at = ?, status = ?, messages_seen = ?, messages_stored = ?,
|
|
attachments_saved = ?, api_calls = ?, retries = ?, error = ?
|
|
WHERE id = ?""",
|
|
(
|
|
_now_iso(), status,
|
|
int(stats.get("messages_seen", 0)),
|
|
int(stats.get("messages_stored", 0)),
|
|
int(stats.get("attachments_saved", 0)),
|
|
int(stats.get("api_calls", 0)),
|
|
int(stats.get("retries", 0)),
|
|
error,
|
|
run_id,
|
|
),
|
|
)
|