0b893295e1
Close the two locked post-deploy enhancements for the Matrix intake bot.
Fuzzy matching (server-side, ships in the s9pk): new find_intake_candidates in
server.py returns ranked deterministic near-matches (difflib name similarity +
token-set Jaccard, legal-suffix-aware, + email Levenshtein <= 2); GET
/api/intake/match now returns {match, candidates}. The bot surfaces a numbered
shortlist so a near-duplicate (Charlie/Charles, Acme Capital vs Acme Capital LLC,
a one-char email typo) is confirmed by a human instead of silently creating a
second investor. Exact match still auto-attaches; fuzzy candidates are never
auto-attached. The optional LLM-judge re-rank is deferred.
Conversational edits (bot-side, ships on the Spark): any in-thread reply that
isn't yes/no/edit field=value is treated as a natural-language revision and
re-run through local Qwen (parse.revise). Email integrity is preserved -- a
changed address must literally appear in the instruction; the model's email
field is structurally unreachable. No-op revisions re-prompt.
Docs/current-state brought current; 27/27 backend tests green.
191 lines
9.2 KiB
Python
191 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
||
"""Matrix intake bot — entrypoint.
|
||
|
||
A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control)
|
||
into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member
|
||
replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does
|
||
the bot write, through the CRM's own API. Nothing is ever written autonomously.
|
||
|
||
Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime).
|
||
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
|
||
"""
|
||
import asyncio
|
||
|
||
from nio import AsyncClient, MatrixRoom, RoomMessageText
|
||
|
||
import crm_client
|
||
import matrix_io
|
||
import parse
|
||
import proposals
|
||
import settings
|
||
|
||
UNCLEAR_HELP = (
|
||
"🤔 I couldn't tell what to record. Try e.g.\n"
|
||
"`New investor: Acme Capital — Jane Doe <jane@acme.com>, met at the Austin conf`\n"
|
||
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
|
||
)
|
||
|
||
|
||
async def main():
|
||
mx = settings.matrix_settings()
|
||
client = AsyncClient(mx["homeserver"], mx["user_id"])
|
||
client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"])
|
||
say = matrix_io.make_say(client)
|
||
nudge = matrix_io.make_reply(client)
|
||
store = proposals.ProposalStore()
|
||
intake_room = mx["intake_room"]
|
||
|
||
async def handle_intake(room_id, root, text):
|
||
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
|
||
# an easy slip — point the user back to the thread rather than parse it as a new intake.
|
||
action, _ = proposals.interpret_reply(text)
|
||
if action in ("approve", "reject") and store.any_pending():
|
||
await nudge(room_id, "👉 To approve, reject, or edit a proposal, open its **thread** "
|
||
"and reply there — the note is in the thread.", root)
|
||
return
|
||
try:
|
||
proposal = await asyncio.to_thread(parse.parse_message, text)
|
||
except Exception as exc: # Spark/Qwen unreachable or bad response
|
||
await say(room_id, f"⚠️ couldn't reach the local parser: {str(exc)[:200]}", root)
|
||
return
|
||
if proposal["intent"] == "unclear":
|
||
await say(room_id, UNCLEAR_HELP, root)
|
||
return
|
||
# Resolve new-vs-existing against the CRM matcher (read-only). Degrade gracefully if the
|
||
# CRM is unreachable — still propose as new, just without match/candidate hints.
|
||
match, candidates = None, []
|
||
try:
|
||
res = await asyncio.to_thread(crm_client.match, proposal)
|
||
match = res.get("match")
|
||
candidates = res.get("candidates") or []
|
||
except Exception:
|
||
pass
|
||
if match:
|
||
# Confident exact match → auto-attach the note to that investor (no disambiguation).
|
||
proposal["intent"] = "meeting_note"
|
||
proposal["_match_id"] = match["id"]
|
||
proposal["_stage"] = "approval"
|
||
store.put(root, proposal)
|
||
hint = (f"\n\n🔎 Looks like an existing investor: **{match['name']}** — "
|
||
"this will append a note to them.")
|
||
await say(room_id, proposals.render(proposal) + hint, root)
|
||
await nudge(room_id, proposals.summary_line(proposal), root)
|
||
return
|
||
if candidates:
|
||
# No exact match but near-misses exist → make the human pick one or confirm "new",
|
||
# so a typo'd/near-duplicate name can't silently create a second investor.
|
||
proposal["_stage"] = "disambiguate"
|
||
proposal["_candidates"] = candidates
|
||
store.put(root, proposal)
|
||
await say(room_id, proposals.render_disambiguation(proposal), root)
|
||
await nudge(room_id, proposals.disambiguation_nudge(proposal), root)
|
||
return
|
||
# Genuinely new — straight to the new-investor approval card.
|
||
proposal["_stage"] = "approval"
|
||
store.put(root, proposal)
|
||
await say(room_id, proposals.render(proposal), root)
|
||
# Also drop a brief, un-threaded reply in the main timeline so the proposal isn't
|
||
# easy to miss inside a thread (the full card + yes/edit/no stay in the thread).
|
||
await nudge(room_id, proposals.summary_line(proposal), root)
|
||
|
||
async def handle_reply(room_id, root, text):
|
||
# Claim the proposal synchronously — BEFORE any await — so a second reply that
|
||
# arrives while a commit is in flight can't double-process it. asyncio is
|
||
# cooperative: nothing else runs between here and the first await below, so the
|
||
# pop is atomic w.r.t. other Matrix events.
|
||
proposal = store.pop(root)
|
||
if proposal is None:
|
||
return
|
||
if proposal.get("_stage") == "disambiguate":
|
||
await handle_disambiguation(room_id, root, text, proposal)
|
||
return
|
||
|
||
action, payload = proposals.interpret_reply(text)
|
||
if action == "approve":
|
||
try:
|
||
summary = await asyncio.to_thread(crm_client.commit, proposal)
|
||
except Exception as exc:
|
||
store.put(root, proposal) # commit failed — restore so the user can retry
|
||
await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root)
|
||
return
|
||
await say(room_id, f"✅ {summary}", root)
|
||
elif action == "reject":
|
||
await say(room_id, "🗑️ Discarded — nothing written.", root)
|
||
elif action == "edit":
|
||
field, value = payload
|
||
proposal = proposals.apply_edit(proposal, field, value)
|
||
store.put(root, proposal) # keep it pending (edited) for the next reply
|
||
await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root)
|
||
else:
|
||
# Not yes/no/edit-grammar → treat it as a natural-language revision instruction and
|
||
# re-run it through local Qwen (no Claude, no scrub). The human still approves the
|
||
# revised card, so the draft→approve gate holds.
|
||
try:
|
||
revised = await asyncio.to_thread(parse.revise, proposal, text)
|
||
except Exception as exc:
|
||
store.put(root, proposal)
|
||
await say(room_id, f"⚠️ couldn't apply that change ({str(exc)[:200]}).\n\nReply **yes** "
|
||
"to commit, **no** to discard, **edit field=value**, or rephrase.", root)
|
||
return
|
||
if proposals.same_fields(proposal, revised):
|
||
store.put(root, proposal)
|
||
await say(room_id, "I didn't catch a change there. Reply **yes** to commit, **no** "
|
||
"to discard, **edit field=value**, or tell me what to change.", root)
|
||
return
|
||
store.put(root, revised)
|
||
await say(room_id, "✏️ Updated:\n\n" + proposals.render(revised), root)
|
||
|
||
async def handle_disambiguation(room_id, root, text, proposal):
|
||
cands = proposal.get("_candidates") or []
|
||
action, payload = proposals.interpret_disambiguation(text, len(cands))
|
||
if action == "pick":
|
||
updated = proposals.attach_to_candidate(proposal, cands[payload])
|
||
store.put(root, updated)
|
||
await say(room_id, "✏️ Will log against the existing investor:\n\n"
|
||
+ proposals.render(updated), root)
|
||
elif action == "new":
|
||
updated = proposals.promote_to_new(proposal)
|
||
store.put(root, updated)
|
||
await say(room_id, "➕ OK — adding as a new investor:\n\n"
|
||
+ proposals.render(updated), root)
|
||
elif action == "reject":
|
||
await say(room_id, "🗑️ Discarded — nothing written.", root)
|
||
else: # unrecognized — re-show the shortlist
|
||
store.put(root, proposal)
|
||
await say(room_id, "I didn't catch that.\n\n" + proposals.render_disambiguation(proposal), root)
|
||
|
||
async def on_message(room: MatrixRoom, event: RoomMessageText):
|
||
if event.sender == mx["user_id"]:
|
||
return # never react to our own messages (we post in-thread — this prevents loops)
|
||
if room.room_id != intake_room:
|
||
return
|
||
text = (event.body or "").strip()
|
||
if not text:
|
||
return
|
||
root = matrix_io.thread_root_of(event)
|
||
if root and store.has(root):
|
||
await handle_reply(room.room_id, root, text)
|
||
elif root:
|
||
return # threaded message not tied to a live proposal — ignore
|
||
else:
|
||
await handle_intake(room.room_id, event.event_id, text)
|
||
|
||
# Prime the sync token past history, THEN register the callback — only react to messages
|
||
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
|
||
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
|
||
await client.sync(timeout=30000, full_state=False)
|
||
client.add_event_callback(on_message, RoomMessageText)
|
||
who = await client.whoami()
|
||
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
|
||
try:
|
||
await client.sync_forever(timeout=30000)
|
||
finally:
|
||
await client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
pass
|