Files
ten31-database/backend/matrix_intake/bot.py
T
Keysat a917280bbb Device-test round 2: 4 in-app fixes + Matrix intake cleanup (v0.1.0:99)
Grant's real-phone testing surfaced seven items; this lands six (the seventh,
in-app camera card intake, is planned in docs/handoffs/in-app-card-intake-plan.md).

CRM half — ships in the s9pk (v0.1.0:99):
- Intake fuzzy match no longer over-indexes on generic firm words. _name_similarity
  now compares DISTINCTIVE tokens only (generic descriptors — "Investment Group",
  "Capital", "Family Office" — stripped via _GENERIC_ORG_WORDS) for both the difflib
  ratio and the Jaccard, so "Fortitude Investment Group" stops surfacing Aether/Russell
  while "Aether Capital" still surfaces "Aether Investment Group". +2 regression cases.
- Mobile grid "Last contact"/staleness sort is reversible. SortSheet gains opt-in
  dir/onToggleDir; other surfaces (Contacts/Pipeline) are untouched.
- Mobile "Edit investor" prefills a contact's saved email. GET /api/fundraising/state
  heals a blank grid pill email from the linked classic contact
  (fundraising_contacts.contact_id -> contacts.email), fill-only, by pill order then
  name; the next one-row save persists it. +test_grid_email_heal.py.
- Mobile quick-log pencil icon renders. iOS collapses a sole, centered, attribute-only
  -sized flex-child <svg>; .quicklog-btn svg now gets explicit CSS width/height + flex:none
  (the pattern the working bottom-tab/sort-pill icons use). The v97 fix only changed color.

Matrix intake bot — ships on the Spark (bot-only, NOT the s9pk):
- Approve/reject now redacts the whole intake thread (card + ack + main-timeline nudge +
  the user's own photo/note), mirroring the email-review room; redact_thread takes the
  room as an arg and matches replies by m.thread OR m.in_reply_to (so the nudge clears).
  No more in-Matrix confirmation after a commit (the thread vanishing is the ack).
  Needs the bot to hold a redact/moderator power level in the intake room.
- New one-time backend/matrix_intake/redact_intake.py clears the room's pre-existing
  backlog (dry-run default; --apply).

Tests 42/42 green; frontend render-smoke green. Frontend fixes are inspection + render
-smoke verified (on-device confirm pending); the bot redaction is live-smoke only.
2026-06-20 12:32:56 -05:00

438 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Matrix intake bot — entrypoint.
A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control)
into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member
replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does
the bot write, through the CRM's own API. Nothing is ever written autonomously.
Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime).
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
"""
import asyncio
import base64
from nio import AsyncClient, MatrixRoom, MessageDirection, RoomMessageImage, RoomMessageText
import crm_client
import email_proposals
import matrix_io
import parse
import proposals
import query
import settings
import spark
UNCLEAR_HELP = (
"🤔 I couldn't tell what to record. Try e.g.\n"
"`New investor: Acme Capital — Jane Doe <jane@acme.com>, met at the Austin conf`\n"
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
)
EMAIL_POLL_SEC = 20 # how often the bot polls the CRM for new/decided email-activity proposals
MAX_THREAD_SCAN_PAGES = 8 # how far back to scan for a resolved thread's replies before redacting
async def main():
mx = settings.matrix_settings()
client = AsyncClient(mx["homeserver"], mx["user_id"])
client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"])
say = matrix_io.make_say(client)
nudge = matrix_io.make_reply(client)
store = proposals.ProposalStore()
intake_room = mx["intake_room"]
roster = settings.team_roster() # frames the parse: teammates do outreach, aren't prospects
if roster:
print(f"matrix-intake: team roster loaded ({len(roster)} names)", flush=True)
review_room = settings.email_review_room() # CRM-drafted email proposals (empty → feature off)
query_room = settings.query_room() # dedicated read-only Q&A room (empty → use the intake trigger)
email_threads = {} # Matrix thread-root event_id -> {id, investor_name, note} for an email proposal
async def handle_intake(room_id, root, text, source="matrix_intake"):
# `source` tags provenance for the eventual commit: "matrix_intake" for a typed note,
# "matrix_card" when the text came from a scanned business card (on_image). Everything
# else about the flow is identical — that's the whole point of transcribe-then-reuse.
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
# an easy slip — point the user back to the thread rather than parse it as a new intake.
action, _ = proposals.interpret_reply(text)
if action in ("approve", "reject") and store.any_pending():
await nudge(room_id, "👉 To approve, reject, or edit a proposal, open its **thread** "
"and reply there — the note is in the thread.", root)
return
try:
proposal = await asyncio.to_thread(parse.parse_message, text, roster=roster)
except Exception as exc: # Spark/Qwen unreachable or bad response
await say(room_id, f"⚠️ couldn't reach the local parser: {str(exc)[:200]}", root)
return
if proposal["intent"] == "unclear":
await say(room_id, UNCLEAR_HELP, root)
return
proposal["_source"] = source # rides through to commit (control key, survives dict() copies)
# Resolve new-vs-existing against the CRM matcher (read-only). Degrade gracefully if the
# CRM is unreachable — still propose as new, just without match/candidate hints.
match, candidates = None, []
try:
res = await asyncio.to_thread(crm_client.match, proposal)
match = res.get("match")
candidates = res.get("candidates") or []
except Exception:
pass
if match:
# Confident exact match → auto-attach the note to that investor (no disambiguation).
proposal["intent"] = "meeting_note"
proposal["_match_id"] = match["id"]
proposal["_stage"] = "approval"
store.put(root, proposal)
hint = (f"\n\n🔎 Looks like an existing investor: **{match['name']}** — "
"this will append a note to them.")
await say(room_id, proposals.render(proposal) + hint, root)
await nudge(room_id, proposals.summary_line(proposal), root)
return
if candidates:
# No exact match but near-misses exist → make the human pick one or confirm "new",
# so a typo'd/near-duplicate name can't silently create a second investor.
proposal["_stage"] = "disambiguate"
proposal["_candidates"] = candidates
store.put(root, proposal)
await say(room_id, proposals.render_disambiguation(proposal), root)
await nudge(room_id, proposals.disambiguation_nudge(proposal), root)
return
# Genuinely new — straight to the new-investor approval card.
proposal["_stage"] = "approval"
store.put(root, proposal)
await say(room_id, proposals.render(proposal), root)
# Also drop a brief, un-threaded reply in the main timeline so the proposal isn't
# easy to miss inside a thread (the full card + yes/edit/no stay in the thread).
await nudge(room_id, proposals.summary_line(proposal), root)
async def handle_card(room_id, event):
"""A photo in the intake room → transcribe the business card on the local VL model, then
hand the transcription to the SAME intake flow as a typed note (parse → match → approve).
The only new step is image → text; everything downstream is reused. The transcription is
also the source text the email-integrity check runs against, so a mis-read address can't
slip in unapproved."""
mxc = getattr(event, "url", None)
if not mxc:
# Unencrypted images carry a plain mxc:// url; an encrypted room delivers a different
# event class entirely (we don't register for it), so this only guards the odd case.
await say(room_id, "📇 I can only read unencrypted images right now.", event.event_id)
return
await say(room_id, "📇 Reading the card…", event.event_id) # vision is slower — ack first
try:
resp = await client.download(mxc=mxc)
data = getattr(resp, "body", None)
if not isinstance(data, (bytes, bytearray)): # a DownloadError carries no bytes
raise RuntimeError(getattr(resp, "message", None) or "image download failed")
mime = getattr(resp, "content_type", None) or "image/jpeg"
b64 = base64.b64encode(data).decode("ascii")
text = await asyncio.to_thread(spark.transcribe_card, b64, mime)
except Exception as exc:
await say(room_id, f"⚠️ couldn't read the card: {str(exc)[:200]}", event.event_id)
return
if len(text.strip()) < 5:
await say(room_id, "📇 I couldn't read any text on that card — try a clearer, "
"well-lit photo taken straight-on.", event.event_id)
return
# Frame the raw transcription so the existing extractor reads it as a new-investor intake;
# the transcription itself is what email-integrity is checked against.
framed = "New investor — from a business card:\n" + text.strip()
await handle_intake(room_id, event.event_id, framed, source="matrix_card")
async def handle_query(room_id, root, question):
"""A read-only NL question ('@bot …' / '?…') — translate + run it on the BOX (local Qwen,
nothing leaves the box) and post the answer in a thread. No write path, no approval gate:
it only reads curated, parameterized queries. The endpoint returns its structured result
even on a soft no-match / model-down, so we render that; a transport/auth failure raises
and we show a brief error."""
try:
result = await asyncio.to_thread(crm_client.nl_query, question)
except Exception as exc:
await say(room_id, f"⚠️ couldn't run that query: {str(exc)[:200]}", root)
return
await say(room_id, query.render_answer(result), root)
async def handle_reply(room_id, root, text):
# Claim the proposal synchronously — BEFORE any await — so a second reply that
# arrives while a commit is in flight can't double-process it. asyncio is
# cooperative: nothing else runs between here and the first await below, so the
# pop is atomic w.r.t. other Matrix events.
proposal = store.pop(root)
if proposal is None:
return
if proposal.get("_stage") == "disambiguate":
await handle_disambiguation(room_id, root, text, proposal)
return
action, payload = proposals.interpret_reply(text)
if action == "approve":
try:
summary = await asyncio.to_thread(crm_client.commit, proposal)
except Exception as exc:
store.put(root, proposal) # commit failed — restore so the user can retry
await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root)
return
# Committed → clear the whole thread (card + ack + nudge + the user's note/photo),
# like the email-review room. The thread vanishing is the acknowledgment; a confirmation
# reply would just keep it alive (and need redacting too). Needs the bot's redact/mod
# power in the intake room to clear the user's own messages — else those linger.
await redact_thread(room_id, root)
elif action == "reject":
await redact_thread(room_id, root)
elif action == "edit":
field, value = payload
proposal = proposals.apply_edit(proposal, field, value)
store.put(root, proposal) # keep it pending (edited) for the next reply
await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root)
else:
# Not yes/no/edit-grammar → treat it as a natural-language revision instruction and
# re-run it through local Qwen (no Claude, no scrub). The human still approves the
# revised card, so the draft→approve gate holds.
try:
revised = await asyncio.to_thread(parse.revise, proposal, text, roster=roster)
except Exception as exc:
store.put(root, proposal)
await say(room_id, f"⚠️ couldn't apply that change ({str(exc)[:200]}).\n\nReply **yes** "
"to commit, **no** to discard, **edit field=value**, or rephrase.", root)
return
if proposals.same_fields(proposal, revised):
store.put(root, proposal)
await say(room_id, "I didn't catch a change there. Reply **yes** to commit, **no** "
"to discard, **edit field=value**, or tell me what to change.", root)
return
store.put(root, revised)
await say(room_id, "✏️ Updated:\n\n" + proposals.render(revised), root)
async def handle_disambiguation(room_id, root, text, proposal):
cands = proposal.get("_candidates") or []
action, payload = proposals.interpret_disambiguation(text, len(cands))
if action == "pick":
updated = proposals.attach_to_candidate(proposal, cands[payload])
store.put(root, updated)
await say(room_id, "✏️ Will log against the existing investor:\n\n"
+ proposals.render(updated), root)
elif action == "new":
updated = proposals.promote_to_new(proposal)
store.put(root, updated)
await say(room_id, " OK — adding as a new investor:\n\n"
+ proposals.render(updated), root)
elif action == "reject":
await redact_thread(room_id, root) # discard → clear the thread, like an approve
else: # unrecognized — re-show the shortlist
store.put(root, proposal)
await say(room_id, "I didn't catch that.\n\n" + proposals.render_disambiguation(proposal), root)
async def redact_card(room_id, event_id):
"""Redact one event in `room_id` (best-effort). Redacting our OWN message needs no special
power; redacting someone else's message (a human reply, or the user's original card photo /
intake note) needs the bot to hold a redact/mod power level in that room."""
try:
await client.room_redact(room_id, event_id, reason="proposal resolved")
except Exception as exc:
print(f"matrix-intake: could not redact {event_id}: {exc}", flush=True)
async def redact_thread(room_id, root):
"""Clear a resolved thread in `room_id`: redact the root AND every message that hangs off it
— the m.thread children (cards/acks/human replies) AND the main-timeline **nudge** (a plain
m.in_reply_to reply, not a thread child), so the thread drops out of both the threads view
and the timeline. For email-review the root is the bot's card; for intake it's the USER'S
own note/photo, so clearing it (and the human reply) needs the bot's redact/mod power in that
room — without it those just no-op and linger. Replies are found by scanning recent history
from the current sync token (the triggering reply is already synced, so a backward scan
includes it)."""
await redact_card(room_id, root)
token = getattr(client, "next_batch", None)
if not token:
return
try:
scanned = 0
for _ in range(MAX_THREAD_SCAN_PAGES):
resp = await client.room_messages(room_id, start=token,
direction=MessageDirection.back, limit=100)
chunk = getattr(resp, "chunk", None)
if not chunk:
break
for ev in chunk:
rel = ((getattr(ev, "source", None) or {}).get("content", {}) or {}).get("m.relates_to") or {}
in_reply = (rel.get("m.in_reply_to") or {}).get("event_id")
# A thread child carries event_id==root; the un-threaded nudge carries only
# m.in_reply_to.event_id==root. Catch both so the thread AND its main-timeline
# pointer clear together.
if rel.get("event_id") == root or in_reply == root:
await redact_card(room_id, ev.event_id)
token = getattr(resp, "end", None)
scanned += len(chunk)
if not token or scanned > 1000:
break
except Exception as exc:
print(f"matrix-intake: thread reply cleanup failed for {root}: {exc}", flush=True)
async def handle_email_reply(room_id, root, text):
"""An in-thread reply to a CRM-drafted email-proposal card: yes commits, no dismisses, and
anything else is a natural-language revision of the note (re-drafted by local Qwen; the
human still approves the revised note, so the draft→approve gate holds). On a conclusive
decision the card is redacted so the room clears down to only what still needs handling."""
item = email_threads.get(root)
if item is None:
return # a threaded reply we don't own (or already resolved)
decision = email_proposals.interpret(text)
if decision == "approve":
# Claim before the await (double-approve guard, like the intake commit path).
email_threads.pop(root, None)
try:
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "approve", item.get("note"))
except Exception as exc:
email_threads[root] = item # restore for retry
await say(room_id, email_proposals.frame(f"⚠️ couldn't add it ({str(exc)[:200]}). Reply **yes** to retry, **no** to dismiss."), root)
return
# Success → clear the whole thread (card + replies). No confirmation: the thread
# vanishing is the acknowledgment, and a confirmation reply would keep it alive.
await redact_thread(review_room, root)
elif decision == "reject":
email_threads.pop(root, None)
try:
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "dismiss")
except Exception as exc:
email_threads[root] = item
await say(room_id, email_proposals.frame(f"⚠️ couldn't dismiss it ({str(exc)[:200]}). Try again."), root)
return
await redact_thread(review_room, root)
else:
try:
new_note = await asyncio.to_thread(email_proposals.revise_note, item.get("note") or "", text)
except Exception as exc:
await say(room_id, email_proposals.frame(f"⚠️ couldn't revise that ({str(exc)[:200]}). Reply **yes** to add as-is, "
"**no** to dismiss, or rephrase."), root)
return
if not new_note:
await say(room_id, email_proposals.frame("I didn't catch a change. Reply **yes** to add the note as-is, **no** to "
"dismiss, or tell me how to change it."), root)
return
item["note"] = new_note
email_threads[root] = item
await say(room_id, email_proposals.frame(f"✏️ Updated draft note:\n\n{new_note}\n\nReply **yes** to add it, **no** to "
"dismiss, or refine again."), root)
async def poll_email_proposals():
"""Poll the CRM for email-activity proposals: post a review card for each new one, rebuild
the reply-routing map from already-posted threads (so replies still route after a restart),
and announce+close any decided on the web. One failing cycle logs and retries next tick."""
while True:
try:
lists = await asyncio.to_thread(crm_client.list_email_proposals)
for it in lists["open"]: # rebuild routing for threads posted before (e.g. a restart)
ev = it.get("event_id")
if ev and ev not in email_threads:
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
"note": it.get("proposed_note") or ""}
for it in lists["to_post"]:
try:
resp = await client.room_send(
review_room, "m.room.message",
matrix_io.thread_content(email_proposals.render_card(it), None))
ev = getattr(resp, "event_id", None)
if not ev:
print(f"matrix-intake: card send returned no event_id for {it['id']}", flush=True)
continue
await asyncio.to_thread(crm_client.mark_email_proposal_posted, it["id"], ev)
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
"note": it.get("proposed_note") or ""}
except Exception as exc:
print(f"matrix-intake: failed to post email proposal {it.get('id')}: {exc}", flush=True)
for it in lists["to_close"]: # decided on the web → clear the thread, then close
ev = it.get("event_id")
if not ev:
continue
try:
await redact_thread(review_room, ev)
await asyncio.to_thread(crm_client.mark_email_proposal_closed, it["id"])
email_threads.pop(ev, None)
except Exception as exc:
print(f"matrix-intake: failed to close email proposal {it.get('id')}: {exc}", flush=True)
except Exception as exc:
print(f"matrix-intake: email-proposal poll error: {exc}", flush=True)
await asyncio.sleep(EMAIL_POLL_SEC)
async def on_message(room: MatrixRoom, event: RoomMessageText):
if event.sender == mx["user_id"]:
return # never react to our own messages (we post in-thread — this prevents loops)
text = (event.body or "").strip()
if not text:
return
root = matrix_io.thread_root_of(event)
# Email-proposal review room: only a threaded reply to a card we posted is actionable.
if review_room and room.room_id == review_room:
if root and root in email_threads:
await handle_email_reply(room.room_id, root, text)
return
# Dedicated Q&A room: every top-level message IS a question — no trigger needed. Threaded
# messages (the answers we post, or follow-ups) aren't acted on in v1.
if query_room and room.room_id == query_room:
if not root:
await handle_query(room.room_id, event.event_id, text)
return
if room.room_id != intake_room:
return
if root and store.has(root):
await handle_reply(room.room_id, root, text)
elif root:
return # threaded message not tied to a live proposal — ignore
else:
# A top-level message is either an NL question (explicitly addressed with '?'/'@bot')
# or an intake note. The trigger is required, so plain notes still flow to intake.
q = query.parse_trigger(text)
if q is None:
await handle_intake(room.room_id, event.event_id, text)
elif not q:
await say(room.room_id, query.HELP, event.event_id)
else:
await handle_query(room.room_id, event.event_id, q)
async def on_image(room: MatrixRoom, event: RoomMessageImage):
# Business-card capture is intake-only: ignore our own uploads, images in the Q&A /
# email-review rooms, and an image dropped inside an existing thread (not a fresh card).
if event.sender == mx["user_id"]:
return
if room.room_id != intake_room:
return
if matrix_io.thread_root_of(event):
return
await handle_card(room.room_id, event)
# Prime the sync token past history, THEN register the callbacks — only react to messages
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
await client.sync(timeout=30000, full_state=False)
client.add_event_callback(on_message, RoomMessageText)
client.add_event_callback(on_image, RoomMessageImage)
who = await client.whoami()
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
tasks = [asyncio.create_task(client.sync_forever(timeout=30000))]
if review_room:
# "Invited" isn't "joined" — the bot must join before it can post cards (room_send to a
# room we're only invited to fails M_FORBIDDEN). Idempotent if already a member.
try:
await client.join(review_room)
except Exception as exc:
print(f"matrix-intake: could not join review room {review_room}: {exc}", flush=True)
tasks.append(asyncio.create_task(poll_email_proposals()))
print(f"matrix-intake: reviewing email proposals in room {review_room} (every {EMAIL_POLL_SEC}s)", flush=True)
if query_room:
# Read-only Q&A room — just join and listen (no poll task; questions are interactive).
# "Invited" isn't "joined": the bot must join before it can post answers (idempotent).
try:
await client.join(query_room)
except Exception as exc:
print(f"matrix-intake: could not join Q&A room {query_room}: {exc}", flush=True)
print(f"matrix-intake: answering questions in room {query_room}", flush=True)
try:
await asyncio.gather(*tasks)
finally:
await client.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass