aefb2aa119
Four bot-side UX fixes surfaced by the live smoke: - Post a brief pointer in the main timeline (a reply to the user's message) alongside the in-thread proposal card, so proposals aren't missed inside a thread. Pointer only — approvals still happen in the thread, where the note is visible (you can't make an informed yes/no without seeing it). - A bare yes/no typed in the main timeline while a proposal is pending now gets a "reply in the thread" redirect instead of "couldn't tell what to record." - Clearer commit confirmations: "Created a new grid entry for X" vs "Logged a note on X (existing grid entry)." - Send a blank communication subject when a note is present so the grid's one-line note summary shows the note text, not the "(Matrix)" label (provenance stays in source="matrix_intake").
133 lines
6.0 KiB
Python
133 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Matrix intake bot — entrypoint.
|
|
|
|
A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control)
|
|
into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member
|
|
replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does
|
|
the bot write, through the CRM's own API. Nothing is ever written autonomously.
|
|
|
|
Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime).
|
|
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
|
|
"""
|
|
import asyncio
|
|
|
|
from nio import AsyncClient, MatrixRoom, RoomMessageText
|
|
|
|
import crm_client
|
|
import matrix_io
|
|
import parse
|
|
import proposals
|
|
import settings
|
|
|
|
UNCLEAR_HELP = (
|
|
"🤔 I couldn't tell what to record. Try e.g.\n"
|
|
"`New investor: Acme Capital — Jane Doe <jane@acme.com>, met at the Austin conf`\n"
|
|
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
|
|
)
|
|
|
|
|
|
async def main():
|
|
mx = settings.matrix_settings()
|
|
client = AsyncClient(mx["homeserver"], mx["user_id"])
|
|
client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"])
|
|
say = matrix_io.make_say(client)
|
|
nudge = matrix_io.make_reply(client)
|
|
store = proposals.ProposalStore()
|
|
intake_room = mx["intake_room"]
|
|
|
|
async def handle_intake(room_id, root, text):
|
|
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
|
|
# an easy slip — point the user back to the thread rather than parse it as a new intake.
|
|
action, _ = proposals.interpret_reply(text)
|
|
if action in ("approve", "reject") and store.any_pending():
|
|
await nudge(room_id, "👉 To approve, reject, or edit a proposal, open its **thread** "
|
|
"and reply there — the note is in the thread.", root)
|
|
return
|
|
try:
|
|
proposal = await asyncio.to_thread(parse.parse_message, text)
|
|
except Exception as exc: # Spark/Qwen unreachable or bad response
|
|
await say(room_id, f"⚠️ couldn't reach the local parser: {exc}", root)
|
|
return
|
|
if proposal["intent"] == "unclear":
|
|
await say(room_id, UNCLEAR_HELP, root)
|
|
return
|
|
# Confirm new-vs-existing against the CRM matcher (read-only). Degrade gracefully if
|
|
# the CRM is unreachable — still propose, just without the "looks like existing" hint.
|
|
hint = ""
|
|
try:
|
|
match = await asyncio.to_thread(crm_client.match, proposal)
|
|
if match:
|
|
proposal["intent"] = "meeting_note"
|
|
proposal["_match_id"] = match["id"]
|
|
hint = f"\n\n🔎 Looks like an existing investor: **{match['name']}** — this will append a note to them."
|
|
except Exception:
|
|
pass
|
|
store.put(root, proposal)
|
|
await say(room_id, proposals.render(proposal) + hint, root)
|
|
# Also drop a brief, un-threaded reply in the main timeline so the proposal isn't
|
|
# easy to miss inside a thread (the full card + yes/edit/no stay in the thread).
|
|
await nudge(room_id, proposals.summary_line(proposal), root)
|
|
|
|
async def handle_reply(room_id, root, text):
|
|
action, payload = proposals.interpret_reply(text)
|
|
# Claim the proposal synchronously — BEFORE any await — so a second reply that
|
|
# arrives while a commit is in flight can't double-process it. asyncio is
|
|
# cooperative: nothing else runs between here and the first await below, so the
|
|
# pop is atomic w.r.t. other Matrix events.
|
|
proposal = store.pop(root)
|
|
if proposal is None:
|
|
return
|
|
if action == "approve":
|
|
try:
|
|
summary = await asyncio.to_thread(crm_client.commit, proposal)
|
|
except Exception as exc:
|
|
store.put(root, proposal) # commit failed — restore so the user can retry
|
|
await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root)
|
|
return
|
|
await say(room_id, f"✅ {summary}", root)
|
|
elif action == "reject":
|
|
await say(room_id, "🗑️ Discarded — nothing written.", root)
|
|
elif action == "edit":
|
|
field, value = payload
|
|
proposal = proposals.apply_edit(proposal, field, value)
|
|
store.put(root, proposal) # keep it pending (edited) for the next reply
|
|
await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root)
|
|
else: # unrecognized reply — leave the proposal pending
|
|
store.put(root, proposal)
|
|
await say(room_id, "Reply **yes** to commit, **edit field=value**, or **no**.", root)
|
|
|
|
async def on_message(room: MatrixRoom, event: RoomMessageText):
|
|
if event.sender == mx["user_id"]:
|
|
return # never react to our own messages (we post in-thread — this prevents loops)
|
|
if room.room_id != intake_room:
|
|
return
|
|
text = (event.body or "").strip()
|
|
if not text:
|
|
return
|
|
root = matrix_io.thread_root_of(event)
|
|
if root and store.has(root):
|
|
await handle_reply(room.room_id, root, text)
|
|
elif root:
|
|
return # threaded message not tied to a live proposal — ignore
|
|
else:
|
|
await handle_intake(room.room_id, event.event_id, text)
|
|
|
|
# Prime the sync token past history, THEN register the callback — only react to messages
|
|
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
|
|
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
|
|
await client.sync(timeout=30000, full_state=False)
|
|
client.add_event_callback(on_message, RoomMessageText)
|
|
who = await client.whoami()
|
|
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
|
|
try:
|
|
await client.sync_forever(timeout=30000)
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
pass
|