68106d7a5a
Read-only natural-language query over the curated nl_query endpoint, answered in-thread. Two entry points (room-per-purpose model): a dedicated Q&A room (MATRIX_QUERY_ROOM) where every top-level message is a question, plus the ?/@bot trigger in the intake room as a cross-room convenience. Both routes hit the same handle_query -> crm_client.nl_query -> POST /api/query/nl; translation runs on the box's local model, nothing leaves the box, and there is no write path so no approval gate applies. Pure logic (trigger parsing, answer rendering) in query.py with offline tests; async room wiring in bot.py (live-smoke only, per the bot's convention). Bot-side only, ships on the Spark via git pull + restart. Depends on the box-side /api/query/nl endpoint, which lands with the v93 s9pk (reminders + W2): until v93 is installed the Q&A surface 404s, so the bot deploy is staged to follow that install.
376 lines
20 KiB
Python
376 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""Matrix intake bot — entrypoint.
|
||
|
||
A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control)
|
||
into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member
|
||
replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does
|
||
the bot write, through the CRM's own API. Nothing is ever written autonomously.
|
||
|
||
Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime).
|
||
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
|
||
"""
|
||
import asyncio
|
||
|
||
from nio import AsyncClient, MatrixRoom, MessageDirection, RoomMessageText
|
||
|
||
import crm_client
|
||
import email_proposals
|
||
import matrix_io
|
||
import parse
|
||
import proposals
|
||
import query
|
||
import settings
|
||
|
||
UNCLEAR_HELP = (
|
||
"🤔 I couldn't tell what to record. Try e.g.\n"
|
||
"`New investor: Acme Capital — Jane Doe <jane@acme.com>, met at the Austin conf`\n"
|
||
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
|
||
)
|
||
|
||
EMAIL_POLL_SEC = 20 # how often the bot polls the CRM for new/decided email-activity proposals
|
||
MAX_THREAD_SCAN_PAGES = 8 # how far back to scan for a resolved thread's replies before redacting
|
||
|
||
|
||
async def main():
|
||
mx = settings.matrix_settings()
|
||
client = AsyncClient(mx["homeserver"], mx["user_id"])
|
||
client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"])
|
||
say = matrix_io.make_say(client)
|
||
nudge = matrix_io.make_reply(client)
|
||
store = proposals.ProposalStore()
|
||
intake_room = mx["intake_room"]
|
||
roster = settings.team_roster() # frames the parse: teammates do outreach, aren't prospects
|
||
if roster:
|
||
print(f"matrix-intake: team roster loaded ({len(roster)} names)", flush=True)
|
||
review_room = settings.email_review_room() # CRM-drafted email proposals (empty → feature off)
|
||
query_room = settings.query_room() # dedicated read-only Q&A room (empty → use the intake trigger)
|
||
email_threads = {} # Matrix thread-root event_id -> {id, investor_name, note} for an email proposal
|
||
|
||
async def handle_intake(room_id, root, text):
|
||
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
|
||
# an easy slip — point the user back to the thread rather than parse it as a new intake.
|
||
action, _ = proposals.interpret_reply(text)
|
||
if action in ("approve", "reject") and store.any_pending():
|
||
await nudge(room_id, "👉 To approve, reject, or edit a proposal, open its **thread** "
|
||
"and reply there — the note is in the thread.", root)
|
||
return
|
||
try:
|
||
proposal = await asyncio.to_thread(parse.parse_message, text, roster=roster)
|
||
except Exception as exc: # Spark/Qwen unreachable or bad response
|
||
await say(room_id, f"⚠️ couldn't reach the local parser: {str(exc)[:200]}", root)
|
||
return
|
||
if proposal["intent"] == "unclear":
|
||
await say(room_id, UNCLEAR_HELP, root)
|
||
return
|
||
# Resolve new-vs-existing against the CRM matcher (read-only). Degrade gracefully if the
|
||
# CRM is unreachable — still propose as new, just without match/candidate hints.
|
||
match, candidates = None, []
|
||
try:
|
||
res = await asyncio.to_thread(crm_client.match, proposal)
|
||
match = res.get("match")
|
||
candidates = res.get("candidates") or []
|
||
except Exception:
|
||
pass
|
||
if match:
|
||
# Confident exact match → auto-attach the note to that investor (no disambiguation).
|
||
proposal["intent"] = "meeting_note"
|
||
proposal["_match_id"] = match["id"]
|
||
proposal["_stage"] = "approval"
|
||
store.put(root, proposal)
|
||
hint = (f"\n\n🔎 Looks like an existing investor: **{match['name']}** — "
|
||
"this will append a note to them.")
|
||
await say(room_id, proposals.render(proposal) + hint, root)
|
||
await nudge(room_id, proposals.summary_line(proposal), root)
|
||
return
|
||
if candidates:
|
||
# No exact match but near-misses exist → make the human pick one or confirm "new",
|
||
# so a typo'd/near-duplicate name can't silently create a second investor.
|
||
proposal["_stage"] = "disambiguate"
|
||
proposal["_candidates"] = candidates
|
||
store.put(root, proposal)
|
||
await say(room_id, proposals.render_disambiguation(proposal), root)
|
||
await nudge(room_id, proposals.disambiguation_nudge(proposal), root)
|
||
return
|
||
# Genuinely new — straight to the new-investor approval card.
|
||
proposal["_stage"] = "approval"
|
||
store.put(root, proposal)
|
||
await say(room_id, proposals.render(proposal), root)
|
||
# Also drop a brief, un-threaded reply in the main timeline so the proposal isn't
|
||
# easy to miss inside a thread (the full card + yes/edit/no stay in the thread).
|
||
await nudge(room_id, proposals.summary_line(proposal), root)
|
||
|
||
async def handle_query(room_id, root, question):
|
||
"""A read-only NL question ('@bot …' / '?…') — translate + run it on the BOX (local Qwen,
|
||
nothing leaves the box) and post the answer in a thread. No write path, no approval gate:
|
||
it only reads curated, parameterized queries. The endpoint returns its structured result
|
||
even on a soft no-match / model-down, so we render that; a transport/auth failure raises
|
||
and we show a brief error."""
|
||
try:
|
||
result = await asyncio.to_thread(crm_client.nl_query, question)
|
||
except Exception as exc:
|
||
await say(room_id, f"⚠️ couldn't run that query: {str(exc)[:200]}", root)
|
||
return
|
||
await say(room_id, query.render_answer(result), root)
|
||
|
||
async def handle_reply(room_id, root, text):
|
||
# Claim the proposal synchronously — BEFORE any await — so a second reply that
|
||
# arrives while a commit is in flight can't double-process it. asyncio is
|
||
# cooperative: nothing else runs between here and the first await below, so the
|
||
# pop is atomic w.r.t. other Matrix events.
|
||
proposal = store.pop(root)
|
||
if proposal is None:
|
||
return
|
||
if proposal.get("_stage") == "disambiguate":
|
||
await handle_disambiguation(room_id, root, text, proposal)
|
||
return
|
||
|
||
action, payload = proposals.interpret_reply(text)
|
||
if action == "approve":
|
||
try:
|
||
summary = await asyncio.to_thread(crm_client.commit, proposal)
|
||
except Exception as exc:
|
||
store.put(root, proposal) # commit failed — restore so the user can retry
|
||
await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root)
|
||
return
|
||
await say(room_id, f"✅ {summary}", root)
|
||
elif action == "reject":
|
||
await say(room_id, "🗑️ Discarded — nothing written.", root)
|
||
elif action == "edit":
|
||
field, value = payload
|
||
proposal = proposals.apply_edit(proposal, field, value)
|
||
store.put(root, proposal) # keep it pending (edited) for the next reply
|
||
await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root)
|
||
else:
|
||
# Not yes/no/edit-grammar → treat it as a natural-language revision instruction and
|
||
# re-run it through local Qwen (no Claude, no scrub). The human still approves the
|
||
# revised card, so the draft→approve gate holds.
|
||
try:
|
||
revised = await asyncio.to_thread(parse.revise, proposal, text, roster=roster)
|
||
except Exception as exc:
|
||
store.put(root, proposal)
|
||
await say(room_id, f"⚠️ couldn't apply that change ({str(exc)[:200]}).\n\nReply **yes** "
|
||
"to commit, **no** to discard, **edit field=value**, or rephrase.", root)
|
||
return
|
||
if proposals.same_fields(proposal, revised):
|
||
store.put(root, proposal)
|
||
await say(room_id, "I didn't catch a change there. Reply **yes** to commit, **no** "
|
||
"to discard, **edit field=value**, or tell me what to change.", root)
|
||
return
|
||
store.put(root, revised)
|
||
await say(room_id, "✏️ Updated:\n\n" + proposals.render(revised), root)
|
||
|
||
async def handle_disambiguation(room_id, root, text, proposal):
|
||
cands = proposal.get("_candidates") or []
|
||
action, payload = proposals.interpret_disambiguation(text, len(cands))
|
||
if action == "pick":
|
||
updated = proposals.attach_to_candidate(proposal, cands[payload])
|
||
store.put(root, updated)
|
||
await say(room_id, "✏️ Will log against the existing investor:\n\n"
|
||
+ proposals.render(updated), root)
|
||
elif action == "new":
|
||
updated = proposals.promote_to_new(proposal)
|
||
store.put(root, updated)
|
||
await say(room_id, "➕ OK — adding as a new investor:\n\n"
|
||
+ proposals.render(updated), root)
|
||
elif action == "reject":
|
||
await say(room_id, "🗑️ Discarded — nothing written.", root)
|
||
else: # unrecognized — re-show the shortlist
|
||
store.put(root, proposal)
|
||
await say(room_id, "I didn't catch that.\n\n" + proposals.render_disambiguation(proposal), root)
|
||
|
||
async def redact_card(event_id):
|
||
"""Redact one event (best-effort). Redacting our OWN message needs no special power;
|
||
redacting someone else's reply needs the bot to hold a redact/mod power level."""
|
||
try:
|
||
await client.room_redact(review_room, event_id, reason="proposal resolved")
|
||
except Exception as exc:
|
||
print(f"matrix-intake: could not redact {event_id}: {exc}", flush=True)
|
||
|
||
async def redact_thread(root):
|
||
"""Clear a resolved thread: redact the card AND every reply under it, so the thread drops
|
||
out of the threads view (not just the main timeline). The card is ours (always redactable);
|
||
the human's yes/no reply needs the bot's redact/mod power — if it lacks power that redact
|
||
just no-ops and the reply lingers. Finds replies by scanning recent room history for
|
||
m.thread events pointing at this root (the triggering reply is already synced, so a
|
||
backward scan from the current token includes it)."""
|
||
await redact_card(root)
|
||
token = getattr(client, "next_batch", None)
|
||
if not token:
|
||
return
|
||
try:
|
||
scanned = 0
|
||
for _ in range(MAX_THREAD_SCAN_PAGES):
|
||
resp = await client.room_messages(review_room, start=token,
|
||
direction=MessageDirection.back, limit=100)
|
||
chunk = getattr(resp, "chunk", None)
|
||
if not chunk:
|
||
break
|
||
for ev in chunk:
|
||
rel = ((getattr(ev, "source", None) or {}).get("content", {}) or {}).get("m.relates_to") or {}
|
||
if rel.get("rel_type") == "m.thread" and rel.get("event_id") == root:
|
||
await redact_card(ev.event_id)
|
||
token = getattr(resp, "end", None)
|
||
scanned += len(chunk)
|
||
if not token or scanned > 1000:
|
||
break
|
||
except Exception as exc:
|
||
print(f"matrix-intake: thread reply cleanup failed for {root}: {exc}", flush=True)
|
||
|
||
async def handle_email_reply(room_id, root, text):
|
||
"""An in-thread reply to a CRM-drafted email-proposal card: yes commits, no dismisses, and
|
||
anything else is a natural-language revision of the note (re-drafted by local Qwen; the
|
||
human still approves the revised note, so the draft→approve gate holds). On a conclusive
|
||
decision the card is redacted so the room clears down to only what still needs handling."""
|
||
item = email_threads.get(root)
|
||
if item is None:
|
||
return # a threaded reply we don't own (or already resolved)
|
||
decision = email_proposals.interpret(text)
|
||
if decision == "approve":
|
||
# Claim before the await (double-approve guard, like the intake commit path).
|
||
email_threads.pop(root, None)
|
||
try:
|
||
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "approve", item.get("note"))
|
||
except Exception as exc:
|
||
email_threads[root] = item # restore for retry
|
||
await say(room_id, email_proposals.frame(f"⚠️ couldn't add it ({str(exc)[:200]}). Reply **yes** to retry, **no** to dismiss."), root)
|
||
return
|
||
# Success → clear the whole thread (card + replies). No confirmation: the thread
|
||
# vanishing is the acknowledgment, and a confirmation reply would keep it alive.
|
||
await redact_thread(root)
|
||
elif decision == "reject":
|
||
email_threads.pop(root, None)
|
||
try:
|
||
await asyncio.to_thread(crm_client.decide_email_proposal, item["id"], "dismiss")
|
||
except Exception as exc:
|
||
email_threads[root] = item
|
||
await say(room_id, email_proposals.frame(f"⚠️ couldn't dismiss it ({str(exc)[:200]}). Try again."), root)
|
||
return
|
||
await redact_thread(root)
|
||
else:
|
||
try:
|
||
new_note = await asyncio.to_thread(email_proposals.revise_note, item.get("note") or "", text)
|
||
except Exception as exc:
|
||
await say(room_id, email_proposals.frame(f"⚠️ couldn't revise that ({str(exc)[:200]}). Reply **yes** to add as-is, "
|
||
"**no** to dismiss, or rephrase."), root)
|
||
return
|
||
if not new_note:
|
||
await say(room_id, email_proposals.frame("I didn't catch a change. Reply **yes** to add the note as-is, **no** to "
|
||
"dismiss, or tell me how to change it."), root)
|
||
return
|
||
item["note"] = new_note
|
||
email_threads[root] = item
|
||
await say(room_id, email_proposals.frame(f"✏️ Updated draft note:\n\n{new_note}\n\nReply **yes** to add it, **no** to "
|
||
"dismiss, or refine again."), root)
|
||
|
||
async def poll_email_proposals():
|
||
"""Poll the CRM for email-activity proposals: post a review card for each new one, rebuild
|
||
the reply-routing map from already-posted threads (so replies still route after a restart),
|
||
and announce+close any decided on the web. One failing cycle logs and retries next tick."""
|
||
while True:
|
||
try:
|
||
lists = await asyncio.to_thread(crm_client.list_email_proposals)
|
||
for it in lists["open"]: # rebuild routing for threads posted before (e.g. a restart)
|
||
ev = it.get("event_id")
|
||
if ev and ev not in email_threads:
|
||
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
|
||
"note": it.get("proposed_note") or ""}
|
||
for it in lists["to_post"]:
|
||
try:
|
||
resp = await client.room_send(
|
||
review_room, "m.room.message",
|
||
matrix_io.thread_content(email_proposals.render_card(it), None))
|
||
ev = getattr(resp, "event_id", None)
|
||
if not ev:
|
||
print(f"matrix-intake: card send returned no event_id for {it['id']}", flush=True)
|
||
continue
|
||
await asyncio.to_thread(crm_client.mark_email_proposal_posted, it["id"], ev)
|
||
email_threads[ev] = {"id": it["id"], "investor_name": it.get("investor_name"),
|
||
"note": it.get("proposed_note") or ""}
|
||
except Exception as exc:
|
||
print(f"matrix-intake: failed to post email proposal {it.get('id')}: {exc}", flush=True)
|
||
for it in lists["to_close"]: # decided on the web → clear the thread, then close
|
||
ev = it.get("event_id")
|
||
if not ev:
|
||
continue
|
||
try:
|
||
await redact_thread(ev)
|
||
await asyncio.to_thread(crm_client.mark_email_proposal_closed, it["id"])
|
||
email_threads.pop(ev, None)
|
||
except Exception as exc:
|
||
print(f"matrix-intake: failed to close email proposal {it.get('id')}: {exc}", flush=True)
|
||
except Exception as exc:
|
||
print(f"matrix-intake: email-proposal poll error: {exc}", flush=True)
|
||
await asyncio.sleep(EMAIL_POLL_SEC)
|
||
|
||
async def on_message(room: MatrixRoom, event: RoomMessageText):
|
||
if event.sender == mx["user_id"]:
|
||
return # never react to our own messages (we post in-thread — this prevents loops)
|
||
text = (event.body or "").strip()
|
||
if not text:
|
||
return
|
||
root = matrix_io.thread_root_of(event)
|
||
# Email-proposal review room: only a threaded reply to a card we posted is actionable.
|
||
if review_room and room.room_id == review_room:
|
||
if root and root in email_threads:
|
||
await handle_email_reply(room.room_id, root, text)
|
||
return
|
||
# Dedicated Q&A room: every top-level message IS a question — no trigger needed. Threaded
|
||
# messages (the answers we post, or follow-ups) aren't acted on in v1.
|
||
if query_room and room.room_id == query_room:
|
||
if not root:
|
||
await handle_query(room.room_id, event.event_id, text)
|
||
return
|
||
if room.room_id != intake_room:
|
||
return
|
||
if root and store.has(root):
|
||
await handle_reply(room.room_id, root, text)
|
||
elif root:
|
||
return # threaded message not tied to a live proposal — ignore
|
||
else:
|
||
# A top-level message is either an NL question (explicitly addressed with '?'/'@bot')
|
||
# or an intake note. The trigger is required, so plain notes still flow to intake.
|
||
q = query.parse_trigger(text)
|
||
if q is None:
|
||
await handle_intake(room.room_id, event.event_id, text)
|
||
elif not q:
|
||
await say(room.room_id, query.HELP, event.event_id)
|
||
else:
|
||
await handle_query(room.room_id, event.event_id, q)
|
||
|
||
# Prime the sync token past history, THEN register the callback — only react to messages
|
||
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
|
||
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
|
||
await client.sync(timeout=30000, full_state=False)
|
||
client.add_event_callback(on_message, RoomMessageText)
|
||
who = await client.whoami()
|
||
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
|
||
tasks = [asyncio.create_task(client.sync_forever(timeout=30000))]
|
||
if review_room:
|
||
# "Invited" isn't "joined" — the bot must join before it can post cards (room_send to a
|
||
# room we're only invited to fails M_FORBIDDEN). Idempotent if already a member.
|
||
try:
|
||
await client.join(review_room)
|
||
except Exception as exc:
|
||
print(f"matrix-intake: could not join review room {review_room}: {exc}", flush=True)
|
||
tasks.append(asyncio.create_task(poll_email_proposals()))
|
||
print(f"matrix-intake: reviewing email proposals in room {review_room} (every {EMAIL_POLL_SEC}s)", flush=True)
|
||
if query_room:
|
||
# Read-only Q&A room — just join and listen (no poll task; questions are interactive).
|
||
# "Invited" isn't "joined": the bot must join before it can post answers (idempotent).
|
||
try:
|
||
await client.join(query_room)
|
||
except Exception as exc:
|
||
print(f"matrix-intake: could not join Q&A room {query_room}: {exc}", flush=True)
|
||
print(f"matrix-intake: answering questions in room {query_room}", flush=True)
|
||
try:
|
||
await asyncio.gather(*tasks)
|
||
finally:
|
||
await client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
pass
|