Email search/query + windowed digest preview (v0.1.0:83)

Communications tab (search/query roadmap items 1 & 2):
- Fix the investor dropdown: the facet only listed grid investors, so it
  came back empty whenever email matched a classic contact or org domain
  (no grid id — the common case). It now mirrors the email list, resolving
  each link to a typed identity (fund:/org:/contact:/addr:) with precedence
  grid -> org -> contact -> address; investor_id accepts the typed key
  (bare id = fund: for back-compat) and an unknown prefix matches nothing.
- Add a date-range filter and a click-to-expand full-body view
  (GET /api/email/detail, admin, soft-delete-gated; body_text only, never
  raw remote HTML).
- Add a "Search content" mode: GET /api/email/search wraps the ingest
  hybrid_search over the Qdrant email index (doc_type=email), hydrated and
  soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down.

Daily digest:
- Settings -> Admin builds a digest over a chosen window (last 24h or since
  a date) as an in-app preview before sending (POST /api/admin/digest/preview),
  so the local-Spark summarizer can be verified on demand even on a quiet day.
  Manual send uses the same window; neither advances the daily cursor, so a
  preview never suppresses the scheduled digest.

Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
This commit is contained in:
Keysat
2026-06-16 20:46:15 -05:00
parent c29ac2f2ee
commit c7b74a2704
14 changed files with 989 additions and 138 deletions
+193 -35
View File
@@ -398,17 +398,63 @@ def start_sync_run(conn: sqlite3.Connection, *, account_id: str, kind: str) -> s
return run_id
def _resolve_entity(row) -> tuple:
"""Reduce one email_investor_links hydration row to a (key, name) identity for
the matched investor, with the same precedence the digest uses:
grid investor -> organization -> contact -> raw matched address. The key is
*typed* (`fund:`/`org:`/`contact:`/`addr:`) so the Communications filter can
target the right column. Soft-deleted org/contact rows arrive as NULL (filtered
in the join) and fall through to the next tier."""
if row["fund_id"] and (row["fund_name"] or "").strip():
return f"fund:{row['fund_id']}", row["fund_name"].strip()
if row["org_id"] and (row["org_name"] or "").strip():
return f"org:{row['org_id']}", row["org_name"].strip()
if row["contact_id"] and (row["contact_name"] or "").strip():
return f"contact:{row['contact_id']}", row["contact_name"].strip()
addr = (row["addr"] or "").strip()
return (f"addr:{addr.lower()}", addr) if addr else (None, None)
# Hydration of an email_investor_links row up to the resolvable investor identity,
# shared by the per-email tags and the facet dropdown. Soft-deleted org/contact
# rows are dropped in the join so they fall through to the next identity tier.
_LINK_IDENTITY_JOINS = """
LEFT JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id
LEFT JOIN fundraising_contacts fic ON fic.id = l.fundraising_contact_id
LEFT JOIN fundraising_investors fic_inv ON fic_inv.id = fic.investor_id
LEFT JOIN organizations o ON o.id = l.organization_id AND o.deleted_at IS NULL
LEFT JOIN contacts c ON c.id = l.contact_id AND c.deleted_at IS NULL
"""
_LINK_IDENTITY_COLS = """
l.matched_address AS addr,
COALESCE(fi.id, fic_inv.id) AS fund_id,
COALESCE(fi.investor_name, fic_inv.investor_name) AS fund_name,
COALESCE(fi.graveyard, fic_inv.graveyard) AS fund_graveyard,
o.id AS org_id, o.name AS org_name,
c.id AS contact_id,
NULLIF(TRIM(COALESCE(c.first_name,'') || ' ' || COALESCE(c.last_name,'')), '') AS contact_name
"""
def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str] = None,
account_id: Optional[str] = None, search: Optional[str] = None,
direction: Optional[str] = None, limit: int = 100) -> dict:
direction: Optional[str] = None, since: Optional[str] = None,
until: Optional[str] = None, limit: int = 100) -> dict:
"""Captured-Gmail activity for the admin Communications panel, filterable by
investor (matched fundraising investor) and/or mailbox, with free-text search
over subject/snippet/sender. Returns the email rows plus the filter facets.
matched investor entity, mailbox, direction and date range, with free-text
search over subject/snippet/sender. Returns the email rows plus the filter facets.
Matched-only: the panel shows ONLY email that links to a known investor/contact
(an `email_investor_links` row exists). Unmatched cold/unknown-sender email is
still captured for completeness but is never surfaced here.
Investor identity is *typed* (`fund:`/`org:`/`contact:`/`addr:`) and resolved with
the digest's precedence (grid investor -> organization -> contact -> raw address),
so an email matched only to a classic contact or an org domain — not yet wired to a
grid investor — still shows a real name and is selectable in the dropdown, instead
of the facet coming back empty. `investor_id` accepts a typed key (a bare id is
treated as `fund:` for backward compatibility).
Soft-delete: an email is live only if it still has a non-tombstoned per-mailbox
sighting (`email_account_messages.deleted_at IS NULL`) — the `emails` row itself
carries no deleted_at, so deletion lives on the sighting. Direction is decided at
@@ -433,15 +479,42 @@ def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str]
"AND eam.deleted_at IS NULL)")
params.append(account_id)
if investor_id:
where.append("EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id "
"AND (l.fundraising_investor_id = ? OR l.fundraising_contact_id IN "
"(SELECT id FROM fundraising_contacts WHERE investor_id = ?)))")
params.extend([investor_id, investor_id])
kind, _, val = str(investor_id).partition(":")
if not val: # bare id (legacy) -> grid investor
kind, val = "fund", str(investor_id)
if kind == "fund":
where.append("EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id "
"AND (l.fundraising_investor_id = ? OR l.fundraising_contact_id IN "
"(SELECT id FROM fundraising_contacts WHERE investor_id = ?)))")
params.extend([val, val])
elif kind == "org":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND l.organization_id = ?)")
params.append(val)
elif kind == "contact":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND l.contact_id = ?)")
params.append(val)
elif kind == "addr":
where.append("EXISTS (SELECT 1 FROM email_investor_links l "
"WHERE l.email_id = e.id AND LOWER(l.matched_address) = ?)")
params.append(val.lower())
else:
# Unknown key prefix (malformed input) -> match nothing, never silently
# fall through to an unfiltered list.
where.append("1 = 0")
if search:
like = f"%{search.strip()}%"
where.append("(e.subject LIKE ? OR e.snippet LIKE ? "
"OR e.from_email LIKE ? OR e.from_name LIKE ?)")
params.extend([like, like, like, like])
# Date range over the send time (ISO-8601 strings sort lexically). [since, until).
if since:
where.append("e.sent_at >= ?")
params.append(since)
if until:
where.append("e.sent_at < ?")
params.append(until)
direction = (direction or "").strip().lower()
if direction in ("inbound", "outbound") and own:
marks = ",".join("?" for _ in own)
@@ -459,8 +532,7 @@ def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str]
for r in rows:
r["direction"] = "outbound" if (r["from_email"] or "").lower().strip() in own else "inbound"
r["mailboxes"] = []
r["investors"] = []
r["investor_labels"] = []
r["investors"] = [] # [{id: typed-key, name}] — resolved identities
ids = list(by_id)
if ids:
@@ -474,41 +546,127 @@ def query_email_activity(conn: sqlite3.Connection, *, investor_id: Optional[str]
if s["addr"] and s["addr"] not in mb:
mb.append(s["addr"])
for lnk in cur.execute(
"SELECT l.email_id AS eid, l.matched_address AS addr, "
"COALESCE(fi.id, fic_inv.id) AS inv_id, "
"COALESCE(fi.investor_name, fic_inv.investor_name) AS inv_name "
"FROM email_investor_links l "
"LEFT JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id "
"LEFT JOIN fundraising_contacts fic ON fic.id = l.fundraising_contact_id "
"LEFT JOIN fundraising_investors fic_inv ON fic_inv.id = fic.investor_id "
f"SELECT l.email_id AS eid, {_LINK_IDENTITY_COLS} "
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
f"WHERE l.email_id IN ({marks})", ids):
row = by_id[lnk["eid"]]
if lnk["inv_id"] and lnk["inv_name"]:
if not any(iv["id"] == lnk["inv_id"] for iv in row["investors"]):
row["investors"].append({"id": lnk["inv_id"], "name": lnk["inv_name"]})
elif lnk["addr"] and lnk["addr"] not in row["investor_labels"]:
row["investor_labels"].append(lnk["addr"])
# No graveyard filter here on purpose: a graveyarded investor's *email*
# still shows in the list with its chip (audit completeness, direct or
# via-contact); only the facet dropdown below hides graveyard from the picker.
key, name = _resolve_entity(lnk)
if not key:
continue
invs = by_id[lnk["eid"]]["investors"]
if not any(iv["id"] == key for iv in invs):
invs.append({"id": key, "name": name})
accounts = [dict(r) for r in cur.execute(
"SELECT id, email_address FROM email_accounts ORDER BY email_address")]
# Facet dropdown = live investor relationships only (graveyard = 0, the CRM-wide
# convention). Graveyarded investors are excluded from the *picker*, but their
# captured email still shows in the unfiltered list and stays findable by free-text
# search — this is an audit surface, so history is never hidden, only the picker is.
investors = [dict(r) for r in cur.execute(
"SELECT id, investor_name AS name FROM fundraising_investors WHERE graveyard = 0 AND id IN ("
" SELECT fundraising_investor_id FROM email_investor_links "
" WHERE fundraising_investor_id IS NOT NULL"
" UNION"
" SELECT investor_id FROM fundraising_contacts WHERE id IN ("
" SELECT fundraising_contact_id FROM email_investor_links "
" WHERE fundraising_contact_id IS NOT NULL)"
") ORDER BY investor_name")]
# Facet dropdown mirrors what the list resolves: one entry per distinct matched
# entity (grid investor / org / contact), across all live matched email — not just
# the current page — so the picker is stable under filtering. Excluded from the
# picker: graveyarded grid investors (CRM-wide convention) and raw-address-only
# matches (too many, too noisy). Both still appear in the list and remain findable
# by free-text search — this is an audit surface, so history is never hidden, only
# the picker is.
facet: dict[str, str] = {}
for r in cur.execute(
f"SELECT DISTINCT {_LINK_IDENTITY_COLS} "
f"FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
"WHERE EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = l.email_id AND eam.deleted_at IS NULL)"):
key, name = _resolve_entity(r)
if not key or key.startswith("addr:"):
continue
if key.startswith("fund:") and (r["fund_graveyard"] or 0):
continue
facet.setdefault(key, name)
investors = [{"id": k, "name": v}
for k, v in sorted(facet.items(), key=lambda kv: kv[1].lower())]
return {"emails": rows, "accounts": accounts, "investors": investors,
"count": len(rows), "truncated": truncated}
def search_hit_emails(conn: sqlite3.Connection, email_ids) -> dict:
"""Display fields for the given email ids that are still live (have a
non-tombstoned sighting), keyed by id, with email-level direction.
Used to hydrate + soft-delete-filter semantic-search hits: the Qdrant index can
lag a deletion, and SQLite is canonical (never trust the derived index), so a hit
whose email no longer has a live sighting is dropped here rather than shown."""
ids = [i for i in dict.fromkeys(email_ids) if i]
if not ids:
return {}
cur = conn.cursor()
own = {(r["email_address"] or "").lower().strip()
for r in cur.execute("SELECT email_address FROM email_accounts")}
own.discard("")
marks = ",".join("?" for _ in ids)
out: dict = {}
for e in cur.execute(
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.has_attachments "
f"FROM emails e WHERE e.id IN ({marks}) AND EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", ids):
d = dict(e)
d["direction"] = "outbound" if (d["from_email"] or "").lower().strip() in own else "inbound"
out[d["id"]] = d
return out
def query_email_detail(conn: sqlite3.Connection, email_id: str) -> Optional[dict]:
"""Full record for one captured email — the Communications detail view (full
body + recipients + matched investor identities + mailboxes + attachments).
Returns None if the email doesn't exist or has no live (non-tombstoned) sighting:
soft-delete lives on the per-mailbox `email_account_messages` row, not on `emails`,
so an email is only "live" while at least one sighting survives. Direction is set
at the email level (outbound if the sender is one of our mailboxes), matching the
list. The raw remote `body_html` is NOT returned (XSS); the response carries the
plain-text `body_text` plus a `has_html` flag so the UI can note an HTML-only email."""
cur = conn.cursor()
e = cur.execute(
"SELECT e.id, e.subject, e.from_name, e.from_email, e.sent_at, e.snippet, "
"e.body_text, e.body_html, e.has_attachments, e.match_status, e.gmail_thread_id "
"FROM emails e WHERE e.id = ? AND EXISTS (SELECT 1 FROM email_account_messages eam "
"WHERE eam.email_id = e.id AND eam.deleted_at IS NULL)", (email_id,)).fetchone()
if not e:
return None
row = dict(e)
# Don't ship the raw remote HTML to the client (XSS if any consumer ever renders
# it); the UI shows the plain-text body and only needs to know HTML exists.
row["has_html"] = bool((row.pop("body_html", None) or "").strip())
own = {(r["email_address"] or "").lower().strip()
for r in cur.execute("SELECT email_address FROM email_accounts")}
own.discard("")
row["direction"] = "outbound" if (row["from_email"] or "").lower().strip() in own else "inbound"
row["mailboxes"] = [r["addr"] for r in cur.execute(
"SELECT DISTINCT ea.email_address AS addr FROM email_account_messages eam "
"JOIN email_accounts ea ON ea.id = eam.account_id "
"WHERE eam.email_id = ? AND eam.deleted_at IS NULL ORDER BY ea.email_address", (email_id,))]
row["recipients"] = [dict(r) for r in cur.execute(
"SELECT address, display_name, kind FROM email_recipients "
"WHERE email_id = ? AND kind IN ('to','cc') "
"ORDER BY CASE kind WHEN 'to' THEN 0 ELSE 1 END, address", (email_id,))]
row["attachments"] = [dict(r) for r in cur.execute(
"SELECT filename, mime_type, size_bytes, download_status FROM email_attachments "
"WHERE email_id = ? ORDER BY filename", (email_id,))]
investors: dict[str, str] = {}
for lnk in cur.execute(
f"SELECT {_LINK_IDENTITY_COLS} FROM email_investor_links l {_LINK_IDENTITY_JOINS} "
"WHERE l.email_id = ?", (email_id,)):
key, name = _resolve_entity(lnk)
if key:
investors.setdefault(key, name)
row["investors"] = [{"id": k, "name": v} for k, v in investors.items()]
return row
def finish_sync_run(conn: sqlite3.Connection, run_id: str, *, status: str,
stats: Optional[dict] = None, error: Optional[str] = None) -> None:
stats = stats or {}
+26 -5
View File
@@ -100,15 +100,36 @@ def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None):
}
def send_digest_window(conn_factory=None, *, since_iso, until_iso,
build_fn=None, send_fn=None):
"""Build the digest for an explicit (since_iso, until_iso] window and send it
to the active-admin set now, WITHOUT advancing the daily cursor — a manual or
preview send must never suppress the scheduled daily digest. Same transport +
recipient rules as the daily path (raises digest_mailer.NoTransport when none
is configured / no admin has an address). Backs the admin 'send now' endpoint.
No DB writes happen here (the cursor is deliberately untouched), so the connection
is opened and closed without a commit — don't add one without revisiting that."""
factory = conn_factory or _conn_factory_from_env()
conn = factory()
try:
result = _build_and_send(conn, since_iso, until_iso,
build_fn=build_fn, send_fn=send_fn)
return {"status": "sent", **result}
finally:
conn.close()
def maybe_send_digest(conn_factory=None, *, force=False,
now_local=None, now_utc=None, build_fn=None, send_fn=None):
"""Send the daily digest if it is due (or unconditionally when force=True).
Daily path: skips before the send hour and if already sent today; content
window runs from the last send to now and the cursor advances on success.
force path (the admin 'send now' endpoint): ignores the policy and the guards,
uses a fixed last-24h window, and does NOT advance the daily cursor — so an
on-demand preview never suppresses the scheduled send."""
Daily path (the scheduler loop): skips before the send hour and if already sent
today; content window runs from the last send to now and the cursor advances on
success. force path: ignores the policy and the guards, uses a fixed last-24h
window, and does NOT advance the daily cursor. (The admin 'send now' / preview
endpoints now use send_digest_window for an arbitrary window; force is retained
for the fixed last-24h case and its tests.)"""
import digest_builder
factory = conn_factory or _conn_factory_from_env()
+95
View File
@@ -34,6 +34,8 @@ _GET_ROUTES = {
"/api/email/status": "status",
"/api/email/accounts": "list_accounts",
"/api/email/activity": "activity",
"/api/email/detail": "detail",
"/api/email/search": "search",
"/api/email/threads": "list_threads",
"/api/email/oauth/start": "oauth_start",
"/api/email/oauth/callback": "oauth_callback",
@@ -208,6 +210,8 @@ def _h_activity(handler):
account_id=(q.get("account_id") or "").strip() or None,
search=(q.get("q") or q.get("search") or "").strip() or None,
direction=(q.get("direction") or "").strip() or None,
since=(q.get("since") or "").strip() or None,
until=(q.get("until") or "").strip() or None,
limit=limit,
)
finally:
@@ -215,6 +219,97 @@ def _h_activity(handler):
handler.send_json(result)
def _h_detail(handler):
# Admin-only: the full body + recipients of a captured email is admin-scoped
# substance, same as the activity list it expands from.
user = _require_admin(handler)
if not user:
return
email_id = (handler.get_query_params().get("id") or "").strip()
if not email_id:
return handler.send_error_json("id required", 400)
conn = _conn()
try:
detail = _db.query_email_detail(conn, email_id)
finally:
conn.close()
if detail is None:
return handler.send_error_json("Not found", 404)
handler.send_json(detail)
def _semantic_email_search(query: str, top_k: int) -> list:
"""Hybrid (dense + BM25, reranked) retrieval over the email bodies indexed in
Qdrant, pre-filtered to doc_type='email'. Returns raw ranked hits (payload carries
source_id=email_id, lp_name, date_ts, text). The ingest stack (Spark Control +
Qdrant + the sparse encoder) lives in the Docker image, so it's imported lazily —
a bare CRM without it raises, and the caller maps that to a 503."""
import os
import sys
ingest_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest")
if ingest_dir not in sys.path:
sys.path.insert(0, ingest_dir)
import search as _ingest_search # ingest/search.py
filt = {"must": [{"key": "doc_type", "match": {"value": "email"}}]}
return _ingest_search.hybrid_search(query, top_k=top_k, rerank=True, filt=filt)
def _h_search(handler):
# Admin-only semantic search over captured email *content* (bodies), distinct from
# the structured subject/sender filters in _h_activity. Matched email bodies are the
# only email indexed in Qdrant (see ingest/chunking). Soft-delete-filtered + hydrated
# against SQLite (canonical) so a deleted email never surfaces from the stale index.
user = _require_admin(handler)
if not user:
return
q = handler.get_query_params()
query = (q.get("q") or q.get("query") or "").strip()
if not query:
return handler.send_json({"query": "", "results": []})
try:
top_k = min(50, max(1, int(q.get("top_k", 25))))
except (TypeError, ValueError):
top_k = 25
try:
hits = _semantic_email_search(query, top_k)
except Exception as e:
# Spark Control / Qdrant unreachable, or the ingest stack isn't installed.
# Log server-side (an error can carry a URL/host); give the UI a clean 503.
import sys
print(f"[email-search] retrieval failed: {type(e).__name__}: {e}", file=sys.stderr)
return handler.send_error_json("Content search is unavailable (Spark/Qdrant not reachable).", 503)
# Hydrate + soft-delete-filter against SQLite (canonical), preserving rank order.
payloads = [(h.get("payload", {}) or {}, h) for h in hits]
ids = [p.get("source_id") for p, _ in payloads]
conn = _conn()
try:
live = _db.search_hit_emails(conn, ids)
finally:
conn.close()
results = []
for p, h in payloads:
eid = p.get("source_id")
e = live.get(eid)
if not e:
continue # deleted since indexing, or not matched-resolvable -> drop
results.append({
"email_id": eid,
"subject": e["subject"],
"from_name": e["from_name"],
"from_email": e["from_email"],
"sent_at": e["sent_at"],
"direction": e["direction"],
"has_attachments": e["has_attachments"],
"lp_name": p.get("lp_name"),
"score": h.get("score"),
"excerpt": (h.get("text") or p.get("text") or "").replace("\n", " ").strip()[:300],
})
handler.send_json({"query": query, "results": results, "count": len(results)})
def _h_list_threads(handler):
user = _require_auth(handler)
if not user:
@@ -2,10 +2,13 @@
"""Test the admin-only email-activity panel (Communications tab, v0.1.0:80).
Covers the pure query (`db.query_email_activity`): matched-only scope (unmatched
cold/unknown-sender email is never surfaced), investor/mailbox/search/direction
filters, per-sighting soft-delete, direction at the email level, mailbox + investor
roll-ups, and the filter facets.
Also asserts the route handler enforces admin server-side. Synthetic data only.
cold/unknown-sender email is never surfaced), investor/mailbox/search/direction/
date-range filters, per-sighting soft-delete, direction at the email level, mailbox
roll-ups, and the *typed* investor facet (grid investor / org / contact), including
the v83 fix where an email matched only to a classic contact or org domain — not yet
wired to a grid investor — still resolves to a real name and appears in the dropdown
(previously the facet came back empty). Also asserts the route handler enforces admin
server-side. Synthetic data only.
Run: cd backend && python3 email_integration/test_email_activity_panel.py
"""
@@ -31,17 +34,28 @@ def make_db():
conn.executescript("""
CREATE TABLE email_accounts (id TEXT PRIMARY KEY, email_address TEXT);
CREATE TABLE emails (id TEXT PRIMARY KEY, subject TEXT, from_name TEXT, from_email TEXT,
sent_at TEXT, snippet TEXT, has_attachments INT DEFAULT 0, is_matched INT DEFAULT 0,
sent_at TEXT, snippet TEXT, body_text TEXT, body_html TEXT, gmail_thread_id TEXT,
has_attachments INT DEFAULT 0, is_matched INT DEFAULT 0,
match_status TEXT DEFAULT 'unmatched');
CREATE TABLE email_account_messages (id TEXT PRIMARY KEY, email_id TEXT, account_id TEXT,
is_sent INT DEFAULT 0, deleted_at TEXT);
CREATE TABLE email_recipients (id TEXT PRIMARY KEY, email_id TEXT, address TEXT,
display_name TEXT, kind TEXT);
CREATE TABLE email_attachments (id TEXT PRIMARY KEY, email_id TEXT, filename TEXT,
mime_type TEXT, size_bytes INTEGER, download_status TEXT);
CREATE TABLE email_investor_links (id TEXT PRIMARY KEY, email_id TEXT,
fundraising_investor_id TEXT, fundraising_contact_id TEXT, matched_address TEXT);
fundraising_investor_id TEXT, fundraising_contact_id TEXT,
organization_id TEXT, contact_id TEXT, matched_address TEXT);
CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT, graveyard INTEGER DEFAULT 0);
CREATE TABLE fundraising_contacts (id TEXT PRIMARY KEY, investor_id TEXT, full_name TEXT);
CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, deleted_at TEXT);
CREATE TABLE contacts (id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT,
organization_id TEXT, deleted_at TEXT);
""")
# Two mailboxes (us), three investors (one reached only via a contact link;
# one graveyarded but still with captured email history).
# Two mailboxes (us); investors reached different ways: a grid investor directly,
# a grid investor only via a contact link, a graveyarded grid investor, an org-only
# (domain) match, and a classic-contact-only match (the case that left the dropdown
# empty before v83 — neither carries a grid id).
conn.executemany("INSERT INTO email_accounts VALUES (?,?)", [
("acc-grant", "grant@ten31.xyz"),
("acc-jon", "jonathan@ten31.xyz"),
@@ -52,12 +66,17 @@ def make_db():
("inv-dead", "Dead Deal LP", 1),
])
conn.execute("INSERT INTO fundraising_contacts VALUES ('fc-1','inv-pacific','Sarah Williams')")
conn.execute("INSERT INTO organizations VALUES ('org-bridge','Bridgewater',NULL)")
conn.execute("INSERT INTO contacts VALUES ('c-solo','Nina','Park',NULL,NULL)")
# Emails:
# e1 outbound (from us) -> Harbor, seen by grant
# e2 inbound (from LP) -> Harbor, seen by grant + jonathan
# e3 inbound (from LP) -> Pacific via contact link, seen by jonathan
# e4 inbound, UNMATCHED (no investor link), seen by grant -> must be excluded (matched-only)
# e5 inbound, only sighting is tombstoned -> must be excluded
# e1 outbound -> Harbor (grid), seen by grant
# e2 inbound -> Harbor (grid), seen by grant + jonathan
# e3 inbound -> Pacific via grid contact link, seen by jonathan
# e4 inbound, UNMATCHED -> excluded (matched-only)
# e5 inbound, only sighting tombstoned -> excluded
# e6 inbound -> Dead Deal LP (graveyard grid investor)
# e7 inbound -> Bridgewater via ORG-domain match (no grid id)
# e8 inbound -> Nina Park via CLASSIC-contact match (no grid id, no org)
conn.executemany(
"INSERT INTO emails (id,subject,from_name,from_email,sent_at,snippet,has_attachments,is_matched,match_status) VALUES (?,?,?,?,?,?,?,?,?)",
[
@@ -67,6 +86,8 @@ def make_db():
("e4", "Cold inbound", "Random", "noreply@spam.example", "2026-06-08T08:00:00", "buy now", 0, 0, "unmatched"),
("e5", "Deleted thread", "Ghost", "ghost@x.example", "2026-06-09T08:00:00", "gone", 0, 1, "matched"),
("e6", "Old dead-deal thread", "Dead LP", "lp@deaddeal.example", "2026-06-01T00:00:00", "we passed", 0, 1, "matched"),
("e7", "Macro view", "Ray", "ray@bridgewater.example", "2026-06-10T08:00:00", "rates outlook", 0, 1, "matched"),
("e8", "Coffee?", "Nina Park", "nina@solo.example", "2026-06-11T08:00:00", "in town next week", 0, 1, "matched"),
])
conn.executemany(
"INSERT INTO email_account_messages (id,email_id,account_id,is_sent,deleted_at) VALUES (?,?,?,?,?)",
@@ -78,16 +99,32 @@ def make_db():
("m5", "e4", "acc-grant", 0, None),
("m6", "e5", "acc-grant", 0, "2026-06-10T00:00:00"), # tombstoned
("m7", "e6", "acc-grant", 0, None),
("m8", "e7", "acc-grant", 0, None),
("m9", "e8", "acc-jon", 0, None),
])
conn.executemany(
"INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,fundraising_contact_id,matched_address) VALUES (?,?,?,?,?)",
"INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,fundraising_contact_id,organization_id,contact_id,matched_address) VALUES (?,?,?,?,?,?,?)",
[
("l1", "e1", "inv-harbor", None, "lp@harborvine.example"),
("l2", "e2", "inv-harbor", None, "lp@harborvine.example"),
("l3", "e3", None, "fc-1", "sarah@pacificcap.example"),
("l5", "e5", "inv-harbor", None, "lp@harborvine.example"),
("l6", "e6", "inv-dead", None, "lp@deaddeal.example"),
("l1", "e1", "inv-harbor", None, None, None, "lp@harborvine.example"),
("l2", "e2", "inv-harbor", None, None, None, "lp@harborvine.example"),
("l3", "e3", None, "fc-1", None, None, "sarah@pacificcap.example"),
("l5", "e5", "inv-harbor", None, None, None, "lp@harborvine.example"),
("l6", "e6", "inv-dead", None, None, None, "lp@deaddeal.example"),
("l7", "e7", None, None, "org-bridge", None, "ray@bridgewater.example"),
("l8", "e8", None, None, None, "c-solo", "nina@solo.example"),
])
# Full body + recipients + an attachment on e2, for the detail view.
conn.execute("UPDATE emails SET body_text = ?, gmail_thread_id = ?, has_attachments = 1 WHERE id = 'e2'",
("Thanks for the deck — one question on the carry.", "thr-harbor"))
conn.executemany(
"INSERT INTO email_recipients (id,email_id,address,display_name,kind) VALUES (?,?,?,?,?)",
[
("r1", "e2", "grant@ten31.xyz", "Grant", "to"),
("r2", "e2", "jonathan@ten31.xyz", "Jonathan", "cc"),
("r3", "e2", "lp@harborvine.example", "LP Harbor", "from"), # from -> not surfaced
])
conn.execute("INSERT INTO email_attachments (id,email_id,filename,mime_type,size_bytes,download_status) "
"VALUES ('a1','e2','term_sheet.pdf','application/pdf',20480,'downloaded')")
conn.commit()
return conn
@@ -101,9 +138,9 @@ def main():
# --- baseline: matched live emails only, newest first, tombstoned excluded ---
res = _db.query_email_activity(conn)
check(ids(res) == ["e3", "e2", "e1", "e6"],
check(ids(res) == ["e8", "e7", "e3", "e2", "e1", "e6"],
f"matched live emails newest-first; e5 (tombstoned) + e4 (unmatched) excluded; got {ids(res)}")
check(res["count"] == 4 and res["truncated"] is False, "count + not truncated")
check(res["count"] == 6 and res["truncated"] is False, "count + not truncated")
check("e4" not in ids(res), "unmatched email (no investor link) never surfaces in the panel")
# --- direction at the email level ---
@@ -114,25 +151,44 @@ def main():
check(_db.query_email_activity(conn, direction="outbound")["emails"][0]["id"] == "e1"
and len(_db.query_email_activity(conn, direction="outbound")["emails"]) == 1,
"direction=outbound returns only e1")
check(ids(_db.query_email_activity(conn, direction="inbound")) == ["e3", "e2", "e6"],
check(ids(_db.query_email_activity(conn, direction="inbound")) == ["e8", "e7", "e3", "e2", "e6"],
"direction=inbound excludes the outbound e1 (and unmatched e4)")
# --- mailbox roll-up + per-account filter ---
check(set(e2["mailboxes"]) == {"grant@ten31.xyz", "jonathan@ten31.xyz"}, "e2 seen by both mailboxes")
check(ids(_db.query_email_activity(conn, account_id="acc-jon")) == ["e3", "e2"],
check(ids(_db.query_email_activity(conn, account_id="acc-jon")) == ["e8", "e3", "e2"],
"account_id=acc-jon returns only emails that mailbox saw")
# --- investor filter: direct link and via-contact link ---
check(set(ids(_db.query_email_activity(conn, investor_id="inv-harbor"))) == {"e2", "e1"},
"investor_id=inv-harbor -> e1,e2")
check(ids(_db.query_email_activity(conn, investor_id="inv-pacific")) == ["e3"],
"investor_id=inv-pacific resolved through fundraising_contacts -> e3")
# --- date-range filter [since, until) over sent_at ---
check(ids(_db.query_email_activity(conn, since="2026-06-07T00:00:00", until="2026-06-11T00:00:00")) == ["e7", "e3"],
"date range [06-07, 06-11) -> e7,e3 (excludes 06-11 e8 and earlier e2/e1/e6)")
check(ids(_db.query_email_activity(conn, since="2026-06-10T00:00:00")) == ["e8", "e7"],
"since=06-10 -> e8,e7 only")
# --- investor name roll-up + unmatched fallback ---
check(e1["investors"] == [{"id": "inv-harbor", "name": "Harbor & Vine"}], "e1 investor resolved to name")
# --- investor filter: typed keys + legacy bare-id back-compat ---
check(set(ids(_db.query_email_activity(conn, investor_id="fund:inv-harbor"))) == {"e2", "e1"},
"investor_id=fund:inv-harbor -> e1,e2")
check(set(ids(_db.query_email_activity(conn, investor_id="inv-harbor"))) == {"e2", "e1"},
"legacy bare id treated as fund: -> e1,e2")
check(ids(_db.query_email_activity(conn, investor_id="fund:inv-pacific")) == ["e3"],
"fund:inv-pacific resolved through fundraising_contacts -> e3")
check(ids(_db.query_email_activity(conn, investor_id="org:org-bridge")) == ["e7"],
"org:org-bridge -> e7 (org-domain match)")
check(ids(_db.query_email_activity(conn, investor_id="contact:c-solo")) == ["e8"],
"contact:c-solo -> e8 (classic-contact match)")
check(_db.query_email_activity(conn, investor_id="bogus:x")["emails"] == [],
"unknown investor_id key prefix -> match nothing (never silently unfiltered)")
# --- investor identity roll-up, typed + resolved name ---
check(e1["investors"] == [{"id": "fund:inv-harbor", "name": "Harbor & Vine"}], "e1 grid investor resolved")
e3 = next(e for e in res["emails"] if e["id"] == "e3")
check(e3["investors"] == [{"id": "inv-pacific", "name": "Pacific Capital"}], "e3 investor resolved via contact")
check("e4" not in ids(res), "e4 unmatched -> excluded from the matched-only panel")
check(e3["investors"] == [{"id": "fund:inv-pacific", "name": "Pacific Capital"}], "e3 resolved via grid contact")
e7 = next(e for e in res["emails"] if e["id"] == "e7")
check(e7["investors"] == [{"id": "org:org-bridge", "name": "Bridgewater"}],
"e7 org-domain match resolves to the org name (not a raw address)")
e8 = next(e for e in res["emails"] if e["id"] == "e8")
check(e8["investors"] == [{"id": "contact:c-solo", "name": "Nina Park"}],
"e8 classic-contact match resolves to the contact name")
# --- free-text search over subject / snippet / sender ---
check(set(ids(_db.query_email_activity(conn, search="Fund III"))) == {"e1", "e2"}, "search subject")
@@ -141,18 +197,21 @@ def main():
check(ids(_db.query_email_activity(conn, search="buy now")) == [],
"unmatched email never surfaces, even by free-text search")
# --- facets ---
# --- facets: typed entries spanning grid / org / contact matches ---
check([a["email_address"] for a in res["accounts"]] == ["grant@ten31.xyz", "jonathan@ten31.xyz"],
"accounts facet sorted")
facet_inv = {i["id"] for i in res["investors"]}
check(facet_inv == {"inv-harbor", "inv-pacific"}, "investor facet covers direct + via-contact activity")
check(facet_inv == {"fund:inv-harbor", "fund:inv-pacific", "org:org-bridge", "contact:c-solo"},
f"investor facet now mirrors the list (grid + org + contact), not just grid; got {facet_inv}")
check([i["name"] for i in res["investors"]] == sorted(i["name"] for i in res["investors"]),
"facet sorted by display name")
# --- graveyard: hidden from the picker, but its email stays visible + findable ---
check("inv-dead" not in facet_inv, "graveyard investor excluded from the facet dropdown")
check("fund:inv-dead" not in facet_inv, "graveyard investor excluded from the facet dropdown")
check("e6" in ids(res), "graveyard investor's email still shows in the unfiltered list (audit completeness)")
e6 = next(e for e in res["emails"] if e["id"] == "e6")
check(e6["investors"] == [{"id": "inv-dead", "name": "Dead Deal LP"}], "graveyard email still shows its investor chip")
check(ids(_db.query_email_activity(conn, investor_id="inv-dead")) == ["e6"],
check(e6["investors"] == [{"id": "fund:inv-dead", "name": "Dead Deal LP"}], "graveyard email still shows its investor chip")
check(ids(_db.query_email_activity(conn, investor_id="fund:inv-dead")) == ["e6"],
"explicit investor_id filter still works for a graveyard investor")
check(ids(_db.query_email_activity(conn, search="deaddeal")) == ["e6"],
"graveyard email remains findable by free-text search")
@@ -161,10 +220,27 @@ def main():
tr = _db.query_email_activity(conn, limit=2)
check(tr["count"] == 2 and tr["truncated"] is True, "limit=2 -> truncated")
# --- detail view (full body + recipients + attachments + identity) ---
d = _db.query_email_detail(conn, "e2")
check(d is not None and d["body_text"] == "Thanks for the deck — one question on the carry.",
"detail returns the full body")
check(d["direction"] == "inbound" and set(d["mailboxes"]) == {"grant@ten31.xyz", "jonathan@ten31.xyz"},
"detail direction + mailboxes")
check([(r["address"], r["kind"]) for r in d["recipients"]] ==
[("grant@ten31.xyz", "to"), ("jonathan@ten31.xyz", "cc")],
"detail recipients = to/cc only (from is excluded)")
check([a["filename"] for a in d["attachments"]] == ["term_sheet.pdf"], "detail lists attachments")
check(d["investors"] == [{"id": "fund:inv-harbor", "name": "Harbor & Vine"}], "detail resolves investor identity")
check(_db.query_email_detail(conn, "e5") is None,
"detail of a tombstoned-only email -> None (soft-delete on the sighting)")
check(_db.query_email_detail(conn, "nope") is None, "detail of a missing id -> None")
conn.close()
# --- route enforces admin server-side ---
test_route_admin_only()
# --- semantic content-search route (hydrate + soft-delete + 503) ---
test_search_route()
if FAILS:
print(f"\nFAILED ({len(FAILS)})")
@@ -174,6 +250,28 @@ def main():
print("\nALL PASS (email-activity panel)")
class FakeHandler:
def __init__(self, user, params=None):
self._user = user
self._params = params or {}
self.json = None
self.err = None
self.code = None
def get_user(self):
return self._user
def get_query_params(self):
return self._params
def send_json(self, obj):
self.json = obj
def send_error_json(self, msg, code):
self.err = msg
self.code = code
def test_route_admin_only():
try:
from email_integration import routes
@@ -181,26 +279,6 @@ def test_route_admin_only():
print(f" SKIP route admin test (routes import failed: {e})")
return
class FakeHandler:
def __init__(self, user):
self._user = user
self.json = None
self.err = None
self.code = None
def get_user(self):
return self._user
def get_query_params(self):
return {}
def send_json(self, obj):
self.json = obj
def send_error_json(self, msg, code):
self.err = msg
self.code = code
h = FakeHandler(None)
routes._h_activity(h)
check(h.code == 401 and h.json is None, "route: no user -> 401")
@@ -210,5 +288,48 @@ def test_route_admin_only():
check(h.code == 403 and h.json is None, "route: member -> 403 (admin enforced server-side)")
def test_search_route():
try:
from email_integration import routes
except Exception as e: # pragma: no cover
print(f" SKIP search route test (routes import failed: {e})")
return
# Hydration source = a fresh fully-populated in-memory DB each call (the handler
# opens + closes its own conn). Retrieval is stubbed — no Spark/Qdrant in tests.
routes._conn = make_db
routes._semantic_email_search = lambda query, top_k: [
{"score": 0.91, "text": "carry discussion\nand terms", "payload": {"source_id": "e2", "lp_name": "Harbor & Vine"}},
{"score": 0.80, "text": "gone", "payload": {"source_id": "e5", "lp_name": "Ghost"}}, # tombstoned -> drop
{"score": 0.70, "text": "n/a", "payload": {"source_id": "missing", "lp_name": "Nobody"}}, # missing -> drop
]
h = FakeHandler({"role": "admin"}, {"q": "carry"})
routes._h_search(h)
check(h.json and [r["email_id"] for r in h.json["results"]] == ["e2"],
f"content search drops tombstoned + missing, keeps live e2; got {h.json and [r['email_id'] for r in h.json['results']]}")
top = h.json["results"][0]
check(top["lp_name"] == "Harbor & Vine" and top["score"] == 0.91 and top["subject"] == "Re: Fund III update",
"hit carries lp_name + score + hydrated subject")
check("\n" not in top["excerpt"], "excerpt is newline-flattened")
# empty query short-circuits (no retrieval call)
h = FakeHandler({"role": "admin"}, {"q": ""})
routes._h_search(h)
check(h.json == {"query": "", "results": []}, "empty query -> empty results")
# retrieval failure -> clean 503 (Spark/Qdrant down)
def _boom(query, top_k):
raise RuntimeError("spark down")
routes._semantic_email_search = _boom
h = FakeHandler({"role": "admin"}, {"q": "x"})
routes._h_search(h)
check(h.code == 503, f"retrieval failure -> 503, got {h.code}")
# admin enforced
h = FakeHandler({"role": "member"}, {"q": "x"})
routes._h_search(h)
check(h.code == 403, "content search admin-enforced server-side")
if __name__ == "__main__":
main()