Email-proposal review over Matrix + a bot role (v0.1.0:89)

The email-capture "proposed grid notes" gain two review surfaces:

1. Inline source email — each proposed-note card on the Email Capture page
   gets a "View email" toggle that lazily fetches the existing
   GET /api/email/detail and shows from/to/cc/date/subject + scrollable body,
   so a reviewer can judge the note against the email it was drafted from.

2. CRM->Matrix review bridge — the CRM (box, stdlib, no matrix-nio) can't post
   to Matrix, so the intake bot (Spark) PULLS: GET /api/intake/email-proposals
   returns to_post/open/to_close work-lists; the bot posts a review card
   (metadata + snippet + draft note) to a dedicated review room
   (MATRIX_EMAIL_REVIEW_ROOM) and relays in-thread yes / no / NL-edit
   (POST .../{id}/decide, note revised via local Qwen). Decisions sync both
   ways: web decide -> bot announces + closes the thread; Matrix decide -> the
   web panel's ~25s poll clears the card. State lives CRM-side in the new
   email_proposal_matrix side row (email-integration migration 0003, additive
   + idempotent CREATE TABLE IF NOT EXISTS), so it survives a bot restart.

Adds a 'bot' role (authenticated, never admin; require_bot_or_admin) to gate
the email-proposal endpoints rather than handing the bot full admin — the
principled base for the coming agentic capabilities. Role controls reach;
the draft->approve gate still controls autonomy (a human approves every write).

Deploy split: endpoints + migration + role + frontend ship in the s9pk; the
bot poll loop + review-room handling ship on the Spark. The bot's CRM user
must be flipped member->bot and joined to the review room (one-time).

Tests: backend/test_email_proposal_matrix.py + matrix_intake/test_email_proposals.py
(30/30 suite green, render-smoke green, migration verified twice on a DB copy).
This commit is contained in:
Keysat
2026-06-18 09:51:41 -05:00
parent 41def0f014
commit 5faa5ae4d6
16 changed files with 783 additions and 17 deletions
+100 -3
View File
@@ -14,6 +14,7 @@ import asyncio
from nio import AsyncClient, MatrixRoom, RoomMessageText
import crm_client
import email_proposals
import matrix_io
import parse
import proposals
@@ -25,6 +26,8 @@ UNCLEAR_HELP = (
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
)
EMAIL_POLL_SEC = 20 # how often the bot polls the CRM for new/decided email-activity proposals
async def main():
mx = settings.matrix_settings()
@@ -37,6 +40,8 @@ async def main():
roster = settings.team_roster() # frames the parse: teammates do outreach, aren't prospects
if roster:
print(f"matrix-intake: team roster loaded ({len(roster)} names)", flush=True)
review_room = settings.email_review_room() # CRM-drafted email proposals (empty → feature off)
email_threads = {} # Matrix thread-root event_id -> {id, investor_name, note} for an email proposal
async def handle_intake(room_id, root, text):
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
@@ -157,15 +162,103 @@ async def main():
store.put(root, proposal)
await say(room_id, "I didn't catch that.\n\n" + proposals.render_disambiguation(proposal), root)
async def handle_email_reply(room_id, root, text):
"""An in-thread reply to a CRM-drafted email-proposal card: yes commits, no dismisses, and
anything else is a natural-language revision of the note (re-drafted by local Qwen; the
human still approves the revised note, so the draft→approve gate holds)."""
item = email_threads.get(root)
if item is None:
return # a threaded reply we don't own (or already resolved)
decision = email_proposals.interpret(text)
if decision == "approve":
# Claim before the await (double-approve guard, like the intake commit path).
email_threads.pop(root, None)
try:
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "approve", item.get("note"))
except Exception as exc:
email_threads[root] = item # restore for retry
await say(room_id, f"⚠️ couldn't add it ({str(exc)[:200]}). Reply **yes** to retry, **no** to dismiss.", root)
return
await say(room_id, f"✅ Added to the grid for **{item.get('investor_name') or 'the investor'}**.", root)
elif decision == "reject":
email_threads.pop(root, None)
try:
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "dismiss")
except Exception as exc:
email_threads[root] = item
await say(room_id, f"⚠️ couldn't dismiss it ({str(exc)[:200]}). Try again.", root)
return
await say(room_id, "🗑️ Dismissed — nothing added to the grid.", root)
else:
try:
new_note = await asyncio.to_thread(email_proposals.revise_note, item.get("note") or "", text)
except Exception as exc:
await say(room_id, f"⚠️ couldn't revise that ({str(exc)[:200]}). Reply **yes** to add as-is, "
"**no** to dismiss, or rephrase.", root)
return
if not new_note:
await say(room_id, "I didn't catch a change. Reply **yes** to add the note as-is, **no** to "
"dismiss, or tell me how to change it.", root)
return
item["note"] = new_note
email_threads[root] = item
await say(room_id, f"✏️ Updated draft note:\n\n{new_note}\n\nReply **yes** to add it, **no** to "
"dismiss, or refine again.", root)
async def poll_email_proposals():
"""Poll the CRM for email-activity proposals: post a review card for each new one, rebuild
the reply-routing map from already-posted threads (so replies still route after a restart),
and announce+close any decided on the web. One failing cycle logs and retries next tick."""
while True:
try:
lists = await asyncio.to_thread(crm_client.list_email_proposals)
for it in lists["open"]: # rebuild routing for threads posted before (e.g. a restart)
ev = it.get("event_id")
if ev and ev not in email_threads:
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
"note": it.get("proposed_note") or ""}
for it in lists["to_post"]:
try:
resp = await client.room_send(
review_room, "m.room.message",
matrix_io.thread_content(email_proposals.render_card(it), None))
ev = getattr(resp, "event_id", None)
if not ev:
print(f"matrix-intake: card send returned no event_id for {it['id']}", flush=True)
continue
await asyncio.to_thread(crm_client.mark_email_proposal_posted, it["id"], ev)
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
"note": it.get("proposed_note") or ""}
except Exception as exc:
print(f"matrix-intake: failed to post email proposal {it.get('id')}: {exc}", flush=True)
for it in lists["to_close"]: # decided on the web → announce in-thread, then close
ev = it.get("event_id")
if not ev:
continue
try:
await say(review_room, email_proposals.closure_line(it.get("status")), ev)
await asyncio.to_thread(crm_client.mark_email_proposal_closed, it["id"])
email_threads.pop(ev, None)
except Exception as exc:
print(f"matrix-intake: failed to close email proposal {it.get('id')}: {exc}", flush=True)
except Exception as exc:
print(f"matrix-intake: email-proposal poll error: {exc}", flush=True)
await asyncio.sleep(EMAIL_POLL_SEC)
async def on_message(room: MatrixRoom, event: RoomMessageText):
if event.sender == mx["user_id"]:
return # never react to our own messages (we post in-thread — this prevents loops)
if room.room_id != intake_room:
return
text = (event.body or "").strip()
if not text:
return
root = matrix_io.thread_root_of(event)
# Email-proposal review room: only a threaded reply to a card we posted is actionable.
if review_room and room.room_id == review_room:
if root and root in email_threads:
await handle_email_reply(room.room_id, root, text)
return
if room.room_id != intake_room:
return
if root and store.has(root):
await handle_reply(room.room_id, root, text)
elif root:
@@ -180,8 +273,12 @@ async def main():
client.add_event_callback(on_message, RoomMessageText)
who = await client.whoami()
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
tasks = [asyncio.create_task(client.sync_forever(timeout=30000))]
if review_room:
tasks.append(asyncio.create_task(poll_email_proposals()))
print(f"matrix-intake: reviewing email proposals in room {review_room} (every {EMAIL_POLL_SEC}s)", flush=True)
try:
await client.sync_forever(timeout=30000)
await asyncio.gather(*tasks)
finally:
await client.close()
+41
View File
@@ -98,6 +98,47 @@ def match(proposal):
return {"match": match_out, "candidates": candidates}
def list_email_proposals():
"""Pull the email-activity review work-lists for the poll loop: {to_post, open, to_close}.
to_post = pending, un-posted (post a card); open = posted, awaiting a decision (rebuild the
reply-routing map after a restart); to_close = decided on the web (announce in-thread + close)."""
status, data = _authed("GET", "/api/intake/email-proposals")
if status != 200:
raise RuntimeError(f"email-proposals list failed ({status}): {data.get('error') or data}")
payload = data.get("data") or {}
return {k: (payload.get(k) or []) for k in ("to_post", "open", "to_close")}
def mark_email_proposal_posted(proposal_id, event_id):
"""Record the Matrix thread-root event id so the proposal's review state survives a restart."""
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/matrix",
{"event_id": event_id})
if status != 200:
raise RuntimeError(f"mark posted failed ({status}): {data.get('error') or data}")
return data.get("data") or {}
def mark_email_proposal_closed(proposal_id):
"""Mark the review thread resolved after announcing a web-side decision in it."""
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/matrix",
{"closed": True})
if status != 200:
raise RuntimeError(f"mark closed failed ({status}): {data.get('error') or data}")
return data.get("data") or {}
def decide_email_proposal(proposal_id, decision, note=None):
"""Relay an in-thread approve/dismiss (with the possibly-revised note) to the CRM. The server
appends the note to the grid on approve, tags source='matrix', and closes the thread."""
body = {"decision": decision}
if note is not None:
body["note"] = note
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/decide", body)
if status not in (200, 201):
raise RuntimeError(f"email-proposal decide failed ({status}): {data.get('error') or data}")
return data.get("data") or {}
def build_commit_payload(proposal):
"""Pure: map a proposal to the /api/fundraising/log-communication request body.
+85
View File
@@ -0,0 +1,85 @@
"""Email-activity proposal review over Matrix — the CRM→Matrix leg of the email-capture flow.
The CRM (on the box) drafts a proposed grid note per newly-matched email (local model, no Claude)
and queues it for human review. The CRM is stdlib-only and can't post to Matrix itself, so this
bot PULLS the pending proposals (crm_client.list_email_proposals), posts a review card to the
dedicated review room, and relays the human's in-thread reply back to the CRM. Same draft→approve
discipline as the intake bot: nothing is appended to the grid until a human approves — here OR on
the web Email Capture panel, the two surfaces kept in sync via the CRM's email_proposal_matrix row.
This module is the PURE logic (card rendering, reply grammar, note revision) so it's unit-tested
offline; the async poll/post/reply wiring lives in bot.py (network + Matrix, live-smoke only).
"""
import spark
_YES = {"yes", "y", "approve", "approved", "ok", "confirm", "go", "add", "👍", ""}
_NO = {"no", "n", "cancel", "discard", "reject", "skip", "stop", "👎", ""}
_SNIPPET_MAX = 400 # email snippet shown on the card; the full body is in the web popup
def _truncate(s, n):
s = (s or "").strip()
return s if len(s) <= n else s[:n].rstrip() + ""
def render_card(item):
"""The review card posted to the Matrix review room: who/when + a short email snippet + the
drafted note. Deliberately compact for mobile — the full scrollable body is in the web Email
Capture popup (this is the metadata+snippet+note choice)."""
name = item.get("investor_name") or "Unknown investor"
direction = "Sent" if item.get("direction") == "sent" else "Received"
frm = item.get("from_name") or item.get("from_email") or "?"
lines = [f"📧 Proposed **grid note** for **{name}** ({direction})"]
if item.get("email_subject"):
lines.append(f"· Subject: {item['email_subject']}")
if item.get("email_date"):
lines.append(f"· Date: {item['email_date']}")
lines.append(f"· From: {frm}")
snippet = _truncate(item.get("snippet"), _SNIPPET_MAX)
if snippet:
lines.append(f"· Email: {snippet}")
lines.append("")
lines.append(f"📝 Draft note: {item.get('proposed_note') or '(empty)'}")
lines.append("")
lines.append("Reply **yes** to add it to the grid, **no** to dismiss, or just tell me how to "
"change the note (e.g. *say we discussed the Q3 raise*).")
return "\n".join(lines)
def closure_line(status):
"""Posted in-thread when a proposal was decided on the WEB while its Matrix thread was open."""
verb = "approved ✅ and added to the grid" if status == "approved" else "dismissed 🗑️"
return f"This was {verb} on the web — nothing more to do here. Thread closed."
def interpret(text):
"""Classify an in-thread reply: 'approve' | 'reject' | 'revise' (anything else → revise the note)."""
t = (text or "").strip().lower()
if t in _YES:
return "approve"
if t in _NO:
return "reject"
return "revise"
REVISE_SYSTEM = (
"You revise a single CRM note from a short instruction a venture-fund team member typed. "
"You are given the CURRENT note and an INSTRUCTION. Apply the instruction and reply with "
"ONLY a JSON object of the form {\"note\": \"<the full revised note>\"}. Keep it to one or two "
"factual sentences, no preamble. Output JSON only."
)
def revise_note(note, instruction, parse_fn=spark.parse_json):
"""Re-draft the note via local Qwen from a free-form instruction (no Claude, no scrub — same
local-only basis as the intake parse). Returns the new note text, or None if the model gave
nothing usable / unchanged, in which case the caller re-prompts. `parse_fn` is injectable for
tests."""
prompt = "CURRENT:\n" + (note or "") + "\n\nINSTRUCTION:\n" + (instruction or "").strip()
raw = parse_fn(prompt, system=REVISE_SYSTEM, max_tokens=400) or {}
new = raw.get("note") if isinstance(raw, dict) else None
new = (new or "").strip()
if not new or new == (note or "").strip():
return None
return new
+7
View File
@@ -61,3 +61,10 @@ def crm_settings():
# unset/empty just means no roster framing, i.e. the prior behavior.
def team_roster():
return [n.strip() for n in os.environ.get("INTAKE_TEAM_ROSTER", "").split(",") if n.strip()]
# Dedicated room for reviewing CRM-drafted email-activity proposals (the CRM→Matrix push leg).
# Separate from the intake room so high-volume email proposals don't drown the conversational
# intake flow. Unset/empty disables the whole email-review poll loop (the bot just does intake).
def email_review_room():
return os.environ.get("MATRIX_EMAIL_REVIEW_ROOM", "").strip()
@@ -0,0 +1,72 @@
"""Offline tests for the email-proposal review logic (card render, reply grammar, note revision).
The network/Matrix wiring lives in bot.py (live-smoke only); this covers the pure functions."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import email_proposals # noqa: E402
ITEM = {
"id": "p1", "investor_name": "Acme Capital", "direction": "received",
"from_name": "Jane Doe", "from_email": "jane@acme.com",
"email_subject": "Re: Fund III", "email_date": "2026-06-02",
"snippet": "thanks for the deck — one question on terms", "proposed_note": "✉ Received: asked about terms",
}
def test_interpret_yes_no_else():
assert email_proposals.interpret("yes") == "approve"
assert email_proposals.interpret(" Y ") == "approve"
assert email_proposals.interpret("") == "approve"
assert email_proposals.interpret("no") == "reject"
assert email_proposals.interpret("skip") == "reject"
# anything that isn't a clear yes/no is treated as a revision instruction
assert email_proposals.interpret("say we discussed the Q3 raise") == "revise"
def test_render_card_has_context_note_and_actions():
card = email_proposals.render_card(ITEM)
assert "Acme Capital" in card and "Received" in card
assert "Jane Doe" in card
assert "Re: Fund III" in card and "2026-06-02" in card
assert "thanks for the deck" in card
assert "✉ Received: asked about terms" in card
assert "yes" in card.lower() and "no" in card.lower()
def test_render_card_sent_direction():
assert "(Sent)" in email_proposals.render_card(dict(ITEM, direction="sent"))
def test_render_card_truncates_long_snippet():
card = email_proposals.render_card(dict(ITEM, snippet="x" * 1000))
assert "" in card and len(card) < 1000
def test_revise_note_applies_model_output():
out = email_proposals.revise_note(
"old note", "make it about the Q3 raise",
parse_fn=lambda prompt, system=None, max_tokens=400: {"note": "Discussed the Q3 raise."})
assert out == "Discussed the Q3 raise."
def test_revise_note_noop_or_empty_returns_none():
# model echoes the same note unchanged -> None so the caller re-prompts (not "Updated")
assert email_proposals.revise_note("same", "x", parse_fn=lambda *a, **k: {"note": "same"}) is None
# model returns nothing usable -> None
assert email_proposals.revise_note("n", "y", parse_fn=lambda *a, **k: {}) is None
assert email_proposals.revise_note("n", "y", parse_fn=lambda *a, **k: None) is None
def test_closure_line_reflects_status():
assert "approved" in email_proposals.closure_line("approved").lower()
assert "dismiss" in email_proposals.closure_line("dismissed").lower()
if __name__ == "__main__":
fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
for fn in fns:
fn()
print(f"ok {fn.__name__}")
print(f"\n{len(fns)} passed")