"""Outreach drafting agent — tailored LP outreach in Ten31's voice, grounded in the thesis + the LP's DE-IDENTIFIED context, through the redaction boundary. Draft-only: a human reviews, edits, and sends (guardrails #4 and #6 — no auto-send, no cold/outbound automation until counsel defines the solicitation posture). Sovereignty: the thesis is Ten31's own non-sensitive messaging and goes to Claude as-is; the LP's context (CRM notes + email history) is scrubbed first, so the LP list never reaches the API in the clear, and the draft is re-hydrated locally for the human. """ import json import os import sys _HERE = os.path.dirname(os.path.abspath(__file__)) # outreach_type -> human description woven into the prompt OUTREACH_TYPES = { "intro": "a first introduction to Ten31 and the fund", "follow_up": "a warm follow-up that moves the conversation forward", "fund_update": "a fund update / progress note", "meeting_follow_up": "a follow-up after a recent meeting or call", "nurture": "a light-touch note to stay in contact", } def _days_between(then_iso, now_iso): from datetime import datetime try: a = datetime.strptime(str(then_iso)[:10], "%Y-%m-%d") b = datetime.strptime(str(now_iso)[:10], "%Y-%m-%d") return (b - a).days except Exception: return None def follow_up_radar(conn, our_addresses, now_iso, warm_days=45, limit=60): """Deterministic scan: surface investors who need attention, each with a concrete, checkable reason (no LLM guesswork in the *surfacing*). Tiers, most urgent first: 0 you owe a reply (their email is the most recent, unanswered) 1 flagged for follow-up and quiet 2 warm lead gone quiet (no contact in >= warm_days) """ own = {(a or "").lower() for a in (our_addresses or [])} try: rows = conn.execute("SELECT * FROM fundraising_investors").fetchall() except Exception: return [] items = [] for r in rows: d = dict(r) inv_id, name = d.get("id"), d.get("investor_name") if not inv_id: continue gv = d.get("graveyard") if gv and str(gv).strip().lower() not in ("", "0", "false", "no"): continue # buried leads are out of scope try: erows = conn.execute( "SELECT e.from_email, e.sent_at FROM emails e " "JOIN email_investor_links l ON l.email_id = e.id " "WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 " "ORDER BY e.sent_at DESC LIMIT 50", (inv_id,)).fetchall() except Exception: erows = [] if not erows: continue # no email history -> nothing to base a nudge on last = erows[0] days = _days_between(last["sent_at"], now_iso) if days is None: continue inbound_last = (last["from_email"] or "").lower() not in own # they emailed last ff = d.get("follow_up") flagged = bool(ff) and str(ff).strip().lower() not in ("", "0", "false", "no") reason, tier, suggested = None, None, "follow_up" if inbound_last and days >= 3: reason, tier, suggested = f"You owe a reply — they emailed {days} days ago", 0, "follow_up" elif flagged and days >= 14: reason, tier, suggested = f"Flagged for follow-up, quiet {days} days", 1, "follow_up" elif days >= warm_days and len(erows) >= 2: reason, tier, suggested = f"No contact in {days} days", 2, "nurture" if reason is None: continue if flagged and tier != 1: reason += " · flagged" items.append({"investor_id": inv_id, "name": name, "reason": reason, "days_since": days, "suggested_type": suggested, "tier": tier}) items.sort(key=lambda x: (x["tier"], -x["days_since"])) return items[:limit] def _context(conn, investor_id): """Assemble the recipient's context. Structured so the model replies to the ACTIVE conversation (the most recent email thread) while still having earlier emails as background. Returns (investor_name, context_text) or (None, None).""" row = conn.execute("SELECT investor_name, notes FROM fundraising_investors WHERE id=?", (investor_id,)).fetchone() if not row: return None, None name = row["investor_name"] header = [f"Investor: {name}"] notes = (row["notes"] or "").strip() if notes: header.append("CRM notes:\n" + notes) try: rows = conn.execute( "SELECT e.subject, e.body_text, e.snippet, e.sent_at, e.thread_id FROM emails e " "JOIN email_investor_links l ON l.email_id = e.id " "WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 " "ORDER BY e.sent_at DESC LIMIT 20", (investor_id,)).fetchall() except Exception: rows = [] # email tables may be absent / not yet captured active, background = [], [] if rows: active_thread = rows[0]["thread_id"] for em in rows: body = (em["body_text"] or em["snippet"] or "")[:1500].strip() block = f"({(em['sent_at'] or '')[:10]}) {em['subject'] or '(no subject)'}\n{body}" in_active = active_thread is not None and em["thread_id"] == active_thread (active if in_active else background).append(block) sections = ["\n".join(header)] if active: sections.append("=== Active conversation (the most recent thread — this is what you are replying to) ===\n" + "\n\n".join(reversed(active[:6]))) if background: sections.append("=== Earlier emails (background only, not the active thread) ===\n" + "\n\n".join(background[:4])) return name, "\n\n".join(sections) # Keyword cues used to pick the sender's prior emails of the SAME PURPOSE as the draft # (so the voice few-shot matches what they're writing, not just whatever is most recent). PURPOSE_PATTERNS = { "intro": ["introduc", "nice to meet", "reaching out", "wanted to connect", "by way of introduction", "e-meet"], "follow_up": ["follow up", "following up", "circle back", "circling back", "checking in", "wanted to revisit", "any thoughts", "wanted to follow", "touching base"], "fund_update": ["update", "progress", "quarter", "deployed", "portfolio", "milestone", "closing", "fund iii"], "meeting_follow_up": ["great to meet", "great speaking", "thanks for the call", "thanks for your time", "after our", "following our", "enjoyed our", "great to connect", "great chatting"], "nurture": ["checking in", "hope you", "thinking of you", "stay in touch", "wanted to share", "thought you"], } def _voice_examples(conn, sender_email, outreach_type=None, limit=8): """The sender's OWN sent LP emails OF THE SAME PURPOSE — used as voice few-shot AND surfaced for transparency (no black box). Larger sample, purpose-weighted (not just recent). Returns (blocks_for_model, meta_for_ui); meta is the sender's own emails.""" if not sender_email: return [], [] try: rows = conn.execute( "SELECT subject, body_text, snippet, sent_at, to_emails_json FROM emails " "WHERE LOWER(from_email) = LOWER(?) AND is_matched = 1 " "AND body_text IS NOT NULL AND TRIM(body_text) <> '' " "ORDER BY sent_at DESC LIMIT 80", (sender_email,)).fetchall() except Exception: return [], [] pats = PURPOSE_PATTERNS.get(outreach_type or "", []) scored = [] for idx, r in enumerate(rows): text = ((r["subject"] or "") + " " + (r["body_text"] or r["snippet"] or "")).lower() score = sum(1 for p in pats if p in text) scored.append((score, -idx, r)) # purpose match first, then more recent scored.sort(key=lambda x: (x[0], x[1]), reverse=True) blocks, meta = [], [] for score, _neg_idx, r in scored[:limit]: body = (r["body_text"] or r["snippet"] or "")[:900].strip() if not body: continue blocks.append(f"Example — {r['subject'] or '(no subject)'}\n{body}") to = "" try: arr = json.loads(r["to_emails_json"] or "[]") if arr: to = arr[0].get("email") if isinstance(arr[0], dict) else arr[0] except Exception: to = "" meta.append({"subject": r["subject"] or "(no subject)", "date": (r["sent_at"] or "")[:10], "to": to, "on_topic": score > 0}) return blocks, meta def _draft_with_claude(aa, thesis, type_desc, deident_context, deident_voice, guidance): voice_block = "" if deident_voice: voice_block = ("\n\nHere are examples of how THIS sender actually writes (de-identified). Match their " "voice, tone, sentence rhythm, openers, and sign-off — not just the rules above:\n\n" + "\n\n---\n\n".join(deident_voice)) system = ( "You are Ten31's outreach copilot. Draft ONE ready-to-send LP outreach email in the SENDER's voice. " f"VOICE RULES (follow exactly): {aa.VOICE}" + voice_block + "\n\n" "Ten31 invests in critical infrastructure across bitcoin, AI, energy, and freedom technologies, " "with scarcity as the connecting idea. Current working thesis:\n" + aa._render_thesis(thesis) + "\n\n" "The recipient's context below is DE-IDENTIFIED: people, firms, and amounts appear as placeholders " "like [PERSON_1], [ORG_1], [AMOUNT_1]. Keep every placeholder EXACTLY as written and NEVER invent new " "ones — they are swapped back to real values after you reply. Reply to the ACTIVE conversation; use the " "earlier emails only as background. Output a subject line, then the email body. Do NOT fabricate facts, " "numbers, returns, or commitments that are not present in the context or the thesis.") user = (f"Outreach type: {type_desc}\n\n" f"Recipient context (de-identified):\n{deident_context}\n\n" + (f"Additional guidance from the sender: {guidance}\n\n" if (guidance or "").strip() else "") + "Draft the email now.") resp = aa._client().messages.create( model=aa.MODEL, max_tokens=1200, system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}], messages=[{"role": "user", "content": user}]) return "".join(b.text for b in resp.content if getattr(b, "type", None) == "text") def draft_outreach(conn, investor_id, outreach_type, guidance, db_path, sender_email=None): """Draft tailored outreach for one investor, in the SENDER's voice (few-shot from their own prior emails). FAILS CLOSED: if the scrub can't be prepared or Claude hallucinates a placeholder, no de-anonymized draft is returned.""" name, context = _context(conn, investor_id) if not name: return {"status": "not_found"} type_desc = OUTREACH_TYPES.get(outreach_type, OUTREACH_TYPES["follow_up"]) voice_blocks, voice_meta = _voice_examples(conn, sender_email, outreach_type) # 1) Scrub the sender's voice examples + the recipient context TOGETHER (shared token # space). Nothing reaches Claude in the clear; the voice examples are reference only. try: sys.path.insert(0, os.path.dirname(_HERE)) # backend/ for the redaction package from redaction.client import Boundary boundary = Boundary(db_path=db_path, actor="closer") scrubbed = boundary.scrub(list(voice_blocks) + [context], bucket=False, conn=conn) except Exception as exc: return {"status": "scrub_unavailable", "reason": str(exc)} items = scrubbed["items"] deident_voice, deident_target = items[:-1], items[-1] handle = scrubbed["handle"] # 2) Claude drafts over the de-identified context + voice + (non-sensitive) thesis. try: sys.path.insert(0, _HERE) import architect_agent as aa thesis = aa.at.get_thesis("core", db=db_path) raw = _draft_with_claude(aa, thesis, type_desc, deident_target, deident_voice, guidance) except Exception as exc: boundary.forget(handle) return {"status": "claude_not_configured", "reason": str(exc)} # 3) Re-hydrate locally (strict: a hallucinated placeholder quarantines the draft). rehy = boundary.rehydrate(raw, handle, strict=True, conn=conn) boundary.forget(handle) if rehy.get("error"): return {"status": "rehydrate_failed"} return {"status": "ok", "draft": rehy["text"], "investor_name": name, "scrub_stats": scrubbed.get("stats", {}), "voice_examples": voice_meta}