Files
Keysat aec2b7775b Harden privacy boundary and asset serving (v0.1.0:74)
Fixes from the 2026-06-12 full-eval (P0 + two P1s); code-only, no schema
change. Without these the "private CRM" premise was breachable on the LAN:

- P0: the /assets/ route joined the request path onto FRONTEND_DIR without
  normalizing '..' (get_path/urlparse pass it through), so an unauthenticated
  GET /assets/../../data/crm.db read any file the process could — the LP DB,
  the JWT signing secret (-> admin-token forgery), the Gmail key. Add a realpath
  containment check that 404s anything resolving outside FRONTEND_ROOT.
- P1: the LP-outreach drafter built its redaction Boundary with no ner_fn, so
  unknown people/firms in raw email bodies reached Claude in the clear. Pass the
  local-Qwen NER backstop (ner_fn=_ner_local), matching architect_grounding;
  fails closed via the existing scrub_unavailable path if the local model is down.
- P1: get-by-id handlers leaked soft-deleted records by direct ID. Add
  deleted_at IS NULL to every get-by-id path — contacts, organizations,
  opportunities, lp_profiles — and to the nested related-data sub-selects in
  the contact/opportunity detail payloads, matching the list-handler convention.

Bumps the package to v0.1.0:74 (utils.ts + versions/v0.1.0.74.ts + graph).
Full report in EVALUATION.md; remaining P2/P3 triaged in AGENTS.md Current state.
2026-06-12 18:01:48 -05:00

259 lines
13 KiB
Python

"""Outreach drafting agent — tailored LP outreach in Ten31's voice, grounded in the
thesis + the LP's DE-IDENTIFIED context, through the redaction boundary.
Draft-only: a human reviews, edits, and sends (guardrails #4 and #6 — no auto-send,
no cold/outbound automation until counsel defines the solicitation posture). Sovereignty:
the thesis is Ten31's own non-sensitive messaging and goes to Claude as-is; the LP's
context (CRM notes + email history) is scrubbed first, so the LP list never reaches the
API in the clear, and the draft is re-hydrated locally for the human.
"""
import json
import os
import sys
_HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _HERE) # backend/mcp on path for sibling imports (architect_grounding, architect_agent)
# outreach_type -> human description woven into the prompt
OUTREACH_TYPES = {
"intro": "a first introduction to Ten31 and the fund",
"follow_up": "a warm follow-up that moves the conversation forward",
"fund_update": "a fund update / progress note",
"meeting_follow_up": "a follow-up after a recent meeting or call",
"nurture": "a light-touch note to stay in contact",
}
def _days_between(then_iso, now_iso):
from datetime import datetime
try:
a = datetime.strptime(str(then_iso)[:10], "%Y-%m-%d")
b = datetime.strptime(str(now_iso)[:10], "%Y-%m-%d")
return (b - a).days
except Exception:
return None
def follow_up_radar(conn, our_addresses, now_iso, warm_days=45, limit=60):
"""Deterministic scan: surface investors who need attention, each with a concrete,
checkable reason (no LLM guesswork in the *surfacing*). Tiers, most urgent first:
0 you owe a reply (their email is the most recent, unanswered)
1 flagged for follow-up and quiet
2 warm lead gone quiet (no contact in >= warm_days)
"""
own = {(a or "").lower() for a in (our_addresses or [])}
try:
rows = conn.execute("SELECT * FROM fundraising_investors").fetchall()
except Exception:
return []
items = []
for r in rows:
d = dict(r)
inv_id, name = d.get("id"), d.get("investor_name")
if not inv_id:
continue
gv = d.get("graveyard")
if gv and str(gv).strip().lower() not in ("", "0", "false", "no"):
continue # buried leads are out of scope
try:
erows = conn.execute(
"SELECT e.from_email, e.sent_at FROM emails e "
"JOIN email_investor_links l ON l.email_id = e.id "
"WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 "
"ORDER BY e.sent_at DESC LIMIT 50", (inv_id,)).fetchall()
except Exception:
erows = []
if not erows:
continue # no email history -> nothing to base a nudge on
last = erows[0]
days = _days_between(last["sent_at"], now_iso)
if days is None:
continue
inbound_last = (last["from_email"] or "").lower() not in own # they emailed last
ff = d.get("follow_up")
flagged = bool(ff) and str(ff).strip().lower() not in ("", "0", "false", "no")
reason, tier, suggested = None, None, "follow_up"
if inbound_last and days >= 3:
reason, tier, suggested = f"You owe a reply — they emailed {days} days ago", 0, "follow_up"
elif flagged and days >= 14:
reason, tier, suggested = f"Flagged for follow-up, quiet {days} days", 1, "follow_up"
elif days >= warm_days and len(erows) >= 2:
reason, tier, suggested = f"No contact in {days} days", 2, "nurture"
if reason is None:
continue
if flagged and tier != 1:
reason += " · flagged"
items.append({"investor_id": inv_id, "name": name, "reason": reason,
"days_since": days, "suggested_type": suggested, "tier": tier})
items.sort(key=lambda x: (x["tier"], -x["days_since"]))
return items[:limit]
def _context(conn, investor_id):
"""Assemble the recipient's context. Structured so the model replies to the ACTIVE
conversation (the most recent email thread) while still having earlier emails as
background. Returns (investor_name, context_text) or (None, None)."""
row = conn.execute("SELECT investor_name, notes FROM fundraising_investors WHERE id=?",
(investor_id,)).fetchone()
if not row:
return None, None
name = row["investor_name"]
header = [f"Investor: {name}"]
notes = (row["notes"] or "").strip()
if notes:
header.append("CRM notes:\n" + notes)
try:
rows = conn.execute(
"SELECT e.subject, e.body_text, e.snippet, e.sent_at, e.thread_id FROM emails e "
"JOIN email_investor_links l ON l.email_id = e.id "
"WHERE l.fundraising_investor_id = ? AND e.is_matched = 1 "
"ORDER BY e.sent_at DESC LIMIT 20", (investor_id,)).fetchall()
except Exception:
rows = [] # email tables may be absent / not yet captured
active, background = [], []
if rows:
active_thread = rows[0]["thread_id"]
for em in rows:
body = (em["body_text"] or em["snippet"] or "")[:1500].strip()
block = f"({(em['sent_at'] or '')[:10]}) {em['subject'] or '(no subject)'}\n{body}"
in_active = active_thread is not None and em["thread_id"] == active_thread
(active if in_active else background).append(block)
sections = ["\n".join(header)]
if active:
sections.append("=== Active conversation (the most recent thread — this is what you are replying to) ===\n"
+ "\n\n".join(reversed(active[:6])))
if background:
sections.append("=== Earlier emails (background only, not the active thread) ===\n"
+ "\n\n".join(background[:4]))
return name, "\n\n".join(sections)
# Keyword cues used to pick the sender's prior emails of the SAME PURPOSE as the draft
# (so the voice few-shot matches what they're writing, not just whatever is most recent).
PURPOSE_PATTERNS = {
"intro": ["introduc", "nice to meet", "reaching out", "wanted to connect", "by way of introduction", "e-meet"],
"follow_up": ["follow up", "following up", "circle back", "circling back", "checking in",
"wanted to revisit", "any thoughts", "wanted to follow", "touching base"],
"fund_update": ["update", "progress", "quarter", "deployed", "portfolio", "milestone", "closing", "fund iii"],
"meeting_follow_up": ["great to meet", "great speaking", "thanks for the call", "thanks for your time",
"after our", "following our", "enjoyed our", "great to connect", "great chatting"],
"nurture": ["checking in", "hope you", "thinking of you", "stay in touch", "wanted to share", "thought you"],
}
def _voice_examples(conn, sender_email, outreach_type=None, limit=8):
"""The sender's OWN sent LP emails OF THE SAME PURPOSE — used as voice few-shot AND
surfaced for transparency (no black box). Larger sample, purpose-weighted (not just
recent). Returns (blocks_for_model, meta_for_ui); meta is the sender's own emails."""
if not sender_email:
return [], []
try:
rows = conn.execute(
"SELECT subject, body_text, snippet, sent_at, to_emails_json FROM emails "
"WHERE LOWER(from_email) = LOWER(?) AND is_matched = 1 "
"AND body_text IS NOT NULL AND TRIM(body_text) <> '' "
"ORDER BY sent_at DESC LIMIT 80", (sender_email,)).fetchall()
except Exception:
return [], []
pats = PURPOSE_PATTERNS.get(outreach_type or "", [])
scored = []
for idx, r in enumerate(rows):
text = ((r["subject"] or "") + " " + (r["body_text"] or r["snippet"] or "")).lower()
score = sum(1 for p in pats if p in text)
scored.append((score, -idx, r)) # purpose match first, then more recent
scored.sort(key=lambda x: (x[0], x[1]), reverse=True)
blocks, meta = [], []
for score, _neg_idx, r in scored[:limit]:
body = (r["body_text"] or r["snippet"] or "")[:900].strip()
if not body:
continue
blocks.append(f"Example — {r['subject'] or '(no subject)'}\n{body}")
to = ""
try:
arr = json.loads(r["to_emails_json"] or "[]")
if arr:
to = arr[0].get("email") if isinstance(arr[0], dict) else arr[0]
except Exception:
to = ""
meta.append({"subject": r["subject"] or "(no subject)", "date": (r["sent_at"] or "")[:10],
"to": to, "on_topic": score > 0})
return blocks, meta
def _draft_with_claude(aa, thesis, type_desc, deident_context, deident_voice, guidance):
voice_block = ""
if deident_voice:
voice_block = ("\n\nHere are examples of how THIS sender actually writes (de-identified). Match their "
"voice, tone, sentence rhythm, openers, and sign-off — not just the rules above:\n\n"
+ "\n\n---\n\n".join(deident_voice))
system = (
"You are Ten31's outreach copilot. Draft ONE ready-to-send LP outreach email in the SENDER's voice. "
f"VOICE RULES (follow exactly): {aa.VOICE}" + voice_block + "\n\n"
"Ten31 invests in critical infrastructure across bitcoin, AI, energy, and freedom technologies. "
"The spine: fiat is being debased while AI drives the marginal cost of the reproducible toward "
"zero, so durable value accrues to what stays provably scarce, and the monetary premium accrues "
"to bitcoin as the apex non-debasable reserve asset. AI is the abundance engine and bitcoin is "
"the scarcity anchor. Ten31 owns the scarce links of that one supply chain. This is an "
"asset-value and capital-flow conviction, not a claim that the world transacts or settles in "
"bitcoin. Current working thesis:\n" + aa._render_thesis(thesis) + "\n\n"
"The recipient's context below is DE-IDENTIFIED: people, firms, and amounts appear as placeholders "
"like [PERSON_1], [ORG_1], [AMOUNT_1]. Keep every placeholder EXACTLY as written and NEVER invent new "
"ones — they are swapped back to real values after you reply. Reply to the ACTIVE conversation; use the "
"earlier emails only as background. Output a subject line, then the email body. Do NOT fabricate facts, "
"numbers, returns, or commitments that are not present in the context or the thesis.")
user = (f"Outreach type: {type_desc}\n\n"
f"Recipient context (de-identified):\n{deident_context}\n\n"
+ (f"Additional guidance from the sender: {guidance}\n\n" if (guidance or "").strip() else "")
+ "Draft the email now.")
resp = aa._client().messages.create(
model=aa.MODEL, max_tokens=1200,
system=[{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}],
messages=[{"role": "user", "content": user}])
return "".join(b.text for b in resp.content if getattr(b, "type", None) == "text")
def draft_outreach(conn, investor_id, outreach_type, guidance, db_path, sender_email=None):
"""Draft tailored outreach for one investor, in the SENDER's voice (few-shot from
their own prior emails). FAILS CLOSED: if the scrub can't be prepared or Claude
hallucinates a placeholder, no de-anonymized draft is returned."""
name, context = _context(conn, investor_id)
if not name:
return {"status": "not_found"}
type_desc = OUTREACH_TYPES.get(outreach_type, OUTREACH_TYPES["follow_up"])
voice_blocks, voice_meta = _voice_examples(conn, sender_email, outreach_type)
# 1) Scrub the sender's voice examples + the recipient context TOGETHER (shared token
# space). The recipient context is free-prose email bodies, so the dictionary+regex
# floor is NOT enough — pass the local-Qwen NER backstop (as architect_grounding does)
# to tokenize unknown people/firms not in the CRM. FAILS CLOSED: if the local model is
# unreachable, _ner_local raises here and no de-anonymized draft is returned.
try:
sys.path.insert(0, os.path.dirname(_HERE)) # backend/ for the redaction package
from redaction.client import Boundary
from architect_grounding import _ner_local # local-Qwen NER backstop (sibling module)
boundary = Boundary(db_path=db_path, actor="closer", ner_fn=_ner_local)
scrubbed = boundary.scrub(list(voice_blocks) + [context], bucket=False, conn=conn)
except Exception as exc:
return {"status": "scrub_unavailable", "reason": str(exc)}
items = scrubbed["items"]
deident_voice, deident_target = items[:-1], items[-1]
handle = scrubbed["handle"]
# 2) Claude drafts over the de-identified context + voice + (non-sensitive) thesis.
try:
import architect_agent as aa
thesis = aa.at.get_thesis("core", db=db_path)
raw = _draft_with_claude(aa, thesis, type_desc, deident_target, deident_voice, guidance)
except Exception as exc:
boundary.forget(handle)
return {"status": "claude_not_configured", "reason": str(exc)}
# 3) Re-hydrate locally (strict: a hallucinated placeholder quarantines the draft).
rehy = boundary.rehydrate(raw, handle, strict=True, conn=conn)
boundary.forget(handle)
if rehy.get("error"):
return {"status": "rehydrate_failed"}
return {"status": "ok", "draft": rehy["text"], "investor_name": name,
"scrub_stats": scrubbed.get("stats", {}), "voice_examples": voice_meta}