Files
ten31-database/backend/mcp/outreach_agent.py
T
Keysat 49f84ca9a4 outreach: per-user voice from own emails + transparency; active-thread context (v0.1.0:70)
Voice upgrade. draft_outreach now learns the SENDER's voice: the codified rules PLUS a
few-shot of that user's own recent sent emails (_voice_examples; from_email = the
sender, de-identified in the same scrub batch as the recipient context, reference-only).
The response returns which of the sender's emails were used (subject + date + recipient),
shown in the UI as "Voice based on: …" — transparency to avoid the black-box problem.
Falls back to rules-only with a clear note when the user has no captured sent email.

Context restructured: _context groups the investor's email by thread and labels the most
recent thread as the "Active conversation (what you are replying to)" with earlier emails
as background, so replies stay on-topic instead of dredging old threads.

Sender email resolved in handle_outreach_draft (users table by user_id). Test extended
(active/background split, voice examples + meta, no-sender fallback). Fixed a UI bug the
preview caught: the manual Draft button was onClick={draft}, which passed the click event
as the investor arg after draft() gained params -> circular-JSON error; now onClick={()=>draft()}.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 22:06:38 -05:00

229 lines
11 KiB
Python

"""Outreach drafting agent — tailored LP outreach in Ten31's voice, grounded in the
thesis + the LP's DE-IDENTIFIED context, through the redaction boundary.
Draft-only: a human reviews, edits, and sends (guardrails #4 and #6 — no auto-send,
no cold/outbound automation until counsel defines the solicitation posture). Sovereignty:
the thesis is Ten31's own non-sensitive messaging and goes to Claude as-is; the LP's
context (CRM notes + email history) is scrubbed first, so the LP list never reaches the
API in the clear, and the draft is re-hydrated locally for the human.
"""
import json
import os
import sys
_HERE = os.path.dirname(os.path.abspath(__file__))
# outreach_type -> human description woven into the prompt
OUTREACH_TYPES = {
"intro": "a first introduction to Ten31 and the fund",
"follow_up": "a warm follow-up that moves the conversation forward",
"fund_update": "a fund update / progress note",
"meeting_follow_up": "a follow-up after a recent meeting or call",
"nurture": "a light-touch note to stay in contact",
}
def _days_between(then_iso, now_iso):
from datetime import datetime
try:
a = datetime.strptime(str(then_iso)[:10], "%Y-%m-%d")
b = datetime.strptime(str(now_iso)[:10], "%Y-%m-%d")
return (b - a).days
except Exception:
return None
def follow_up_radar(conn, our_addresses, now_iso, warm_days=45, limit=60):
"""Deterministic scan: surface investors who need attention, each with a concrete,
checkable reason (no LLM guesswork in the *surfacing*). Tiers, most urgent first:
0 you owe a reply (their email is the most recent, unanswered)
1 flagged for follow-up and quiet
2 warm lead gone quiet (no contact in >= warm_days)
"""
own = {(a or "").lower() for a in (our_addresses or [])}
try:
rows = conn.execute("SELECT * FROM fundraising_investors").fetchall()
except Exception:
return []
items = []
for r in rows:
d = dict(r)
inv_id, name = d.get("id"), d.get("investor_name")
if not inv_id:
continue
gv = d.get("graveyard")
if gv and str(gv).strip().lower() not in ("", "0", "false", "no"):
continue # buried leads are out of scope
try:
erows = conn.execute(
"SELECT e.from_email, e.sent_at FROM emails e "
"JOIN email_investor_links l ON l.email_id = e.id "
"WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 "
"ORDER BY e.sent_at DESC LIMIT 50", (inv_id,)).fetchall()
except Exception:
erows = []
if not erows:
continue # no email history -> nothing to base a nudge on
last = erows[0]
days = _days_between(last["sent_at"], now_iso)
if days is None:
continue
inbound_last = (last["from_email"] or "").lower() not in own # they emailed last
ff = d.get("follow_up")
flagged = bool(ff) and str(ff).strip().lower() not in ("", "0", "false", "no")
reason, tier, suggested = None, None, "follow_up"
if inbound_last and days >= 3:
reason, tier, suggested = f"You owe a reply — they emailed {days} days ago", 0, "follow_up"
elif flagged and days >= 14:
reason, tier, suggested = f"Flagged for follow-up, quiet {days} days", 1, "follow_up"
elif days >= warm_days and len(erows) >= 2:
reason, tier, suggested = f"No contact in {days} days", 2, "nurture"
if reason is None:
continue
if flagged and tier != 1:
reason += " · flagged"
items.append({"investor_id": inv_id, "name": name, "reason": reason,
"days_since": days, "suggested_type": suggested, "tier": tier})
items.sort(key=lambda x: (x["tier"], -x["days_since"]))
return items[:limit]
def _context(conn, investor_id):
"""Assemble the recipient's context. Structured so the model replies to the ACTIVE
conversation (the most recent email thread) while still having earlier emails as
background. Returns (investor_name, context_text) or (None, None)."""
row = conn.execute("SELECT investor_name, notes FROM fundraising_investors WHERE id=?",
(investor_id,)).fetchone()
if not row:
return None, None
name = row["investor_name"]
header = [f"Investor: {name}"]
notes = (row["notes"] or "").strip()
if notes:
header.append("CRM notes:\n" + notes)
try:
rows = conn.execute(
"SELECT e.subject, e.body_text, e.snippet, e.sent_at, e.thread_id FROM emails e "
"JOIN email_investor_links l ON l.email_id = e.id "
"WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 "
"ORDER BY e.sent_at DESC LIMIT 20", (investor_id,)).fetchall()
except Exception:
rows = [] # email tables may be absent / not yet captured
active, background = [], []
if rows:
active_thread = rows[0]["thread_id"]
for em in rows:
body = (em["body_text"] or em["snippet"] or "")[:1500].strip()
block = f"({(em['sent_at'] or '')[:10]}) {em['subject'] or '(no subject)'}\n{body}"
in_active = active_thread is not None and em["thread_id"] == active_thread
(active if in_active else background).append(block)
sections = ["\n".join(header)]
if active:
sections.append("=== Active conversation (the most recent thread — this is what you are replying to) ===\n"
+ "\n\n".join(reversed(active[:6])))
if background:
sections.append("=== Earlier emails (background only, not the active thread) ===\n"
+ "\n\n".join(background[:4]))
return name, "\n\n".join(sections)
def _voice_examples(conn, sender_email, limit=4):
"""The sender's OWN recent sent LP emails — used as voice few-shot AND surfaced for
transparency (no black box). Returns (blocks_for_model, meta_for_ui). meta is the
sender's own emails, safe to show them."""
if not sender_email:
return [], []
try:
rows = conn.execute(
"SELECT subject, body_text, snippet, sent_at, to_emails_json FROM emails "
"WHERE LOWER(from_email) = LOWER(?) AND is_matched = 1 "
"AND body_text IS NOT NULL AND TRIM(body_text) <> '' "
"ORDER BY sent_at DESC LIMIT ?", (sender_email, limit)).fetchall()
except Exception:
return [], []
blocks, meta = [], []
for r in rows:
body = (r["body_text"] or r["snippet"] or "")[:1200].strip()
if not body:
continue
blocks.append(f"Example — {r['subject'] or '(no subject)'}\n{body}")
to = ""
try:
arr = json.loads(r["to_emails_json"] or "[]")
if arr:
to = arr[0].get("email") if isinstance(arr[0], dict) else arr[0]
except Exception:
to = ""
meta.append({"subject": r["subject"] or "(no subject)", "date": (r["sent_at"] or "")[:10], "to": to})
return blocks, meta
def _draft_with_claude(aa, thesis, type_desc, deident_context, deident_voice, guidance):
voice_block = ""
if deident_voice:
voice_block = ("\n\nHere are examples of how THIS sender actually writes (de-identified). Match their "
"voice, tone, sentence rhythm, openers, and sign-off — not just the rules above:\n\n"
+ "\n\n---\n\n".join(deident_voice))
system = (
"You are Ten31's outreach copilot. Draft ONE ready-to-send LP outreach email in the SENDER's voice. "
f"VOICE RULES (follow exactly): {aa.VOICE}" + voice_block + "\n\n"
"Ten31 invests in critical infrastructure across bitcoin, AI, energy, and freedom technologies, "
"with scarcity as the connecting idea. Current working thesis:\n" + aa._render_thesis(thesis) + "\n\n"
"The recipient's context below is DE-IDENTIFIED: people, firms, and amounts appear as placeholders "
"like [PERSON_1], [ORG_1], [AMOUNT_1]. Keep every placeholder EXACTLY as written and NEVER invent new "
"ones — they are swapped back to real values after you reply. Reply to the ACTIVE conversation; use the "
"earlier emails only as background. Output a subject line, then the email body. Do NOT fabricate facts, "
"numbers, returns, or commitments that are not present in the context or the thesis.")
user = (f"Outreach type: {type_desc}\n\n"
f"Recipient context (de-identified):\n{deident_context}\n\n"
+ (f"Additional guidance from the sender: {guidance}\n\n" if (guidance or "").strip() else "")
+ "Draft the email now.")
resp = aa._client().messages.create(
model=aa.MODEL, max_tokens=1200,
system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}],
messages=[{"role": "user", "content": user}])
return "".join(b.text for b in resp.content if getattr(b, "type", None) == "text")
def draft_outreach(conn, investor_id, outreach_type, guidance, db_path, sender_email=None):
"""Draft tailored outreach for one investor, in the SENDER's voice (few-shot from
their own prior emails). FAILS CLOSED: if the scrub can't be prepared or Claude
hallucinates a placeholder, no de-anonymized draft is returned."""
name, context = _context(conn, investor_id)
if not name:
return {"status": "not_found"}
type_desc = OUTREACH_TYPES.get(outreach_type, OUTREACH_TYPES["follow_up"])
voice_blocks, voice_meta = _voice_examples(conn, sender_email)
# 1) Scrub the sender's voice examples + the recipient context TOGETHER (shared token
# space). Nothing reaches Claude in the clear; the voice examples are reference only.
try:
sys.path.insert(0, os.path.dirname(_HERE)) # backend/ for the redaction package
from redaction.client import Boundary
boundary = Boundary(db_path=db_path, actor="closer")
scrubbed = boundary.scrub(list(voice_blocks) + [context], bucket=False, conn=conn)
except Exception as exc:
return {"status": "scrub_unavailable", "reason": str(exc)}
items = scrubbed["items"]
deident_voice, deident_target = items[:-1], items[-1]
handle = scrubbed["handle"]
# 2) Claude drafts over the de-identified context + voice + (non-sensitive) thesis.
try:
sys.path.insert(0, _HERE)
import architect_agent as aa
thesis = aa.at.get_thesis("core", db=db_path)
raw = _draft_with_claude(aa, thesis, type_desc, deident_target, deident_voice, guidance)
except Exception as exc:
boundary.forget(handle)
return {"status": "claude_not_configured", "reason": str(exc)}
# 3) Re-hydrate locally (strict: a hallucinated placeholder quarantines the draft).
rehy = boundary.rehydrate(raw, handle, strict=True, conn=conn)
boundary.forget(handle)
if rehy.get("error"):
return {"status": "rehydrate_failed"}
return {"status": "ok", "draft": rehy["text"], "investor_name": name,
"scrub_stats": scrubbed.get("stats", {}), "voice_examples": voice_meta}