#!/usr/bin/env python3 """Matrix intake bot — entrypoint. A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control) into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does the bot write, through the CRM's own API. Nothing is ever written autonomously. Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime). Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env. """ import asyncio from nio import AsyncClient, MatrixRoom, RoomMessageText import crm_client import matrix_io import parse import proposals import settings UNCLEAR_HELP = ( "šŸ¤” I couldn't tell what to record. Try e.g.\n" "`New investor: Acme Capital — Jane Doe , met at the Austin conf`\n" "or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`." ) async def main(): mx = settings.matrix_settings() client = AsyncClient(mx["homeserver"], mx["user_id"]) client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"]) say = matrix_io.make_say(client) nudge = matrix_io.make_reply(client) store = proposals.ProposalStore() intake_room = mx["intake_room"] async def handle_intake(room_id, root, text): # A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is # an easy slip — point the user back to the thread rather than parse it as a new intake. action, _ = proposals.interpret_reply(text) if action in ("approve", "reject") and store.any_pending(): await nudge(room_id, "šŸ‘‰ To approve, reject, or edit a proposal, open its **thread** " "and reply there — the note is in the thread.", root) return try: proposal = await asyncio.to_thread(parse.parse_message, text) except Exception as exc: # Spark/Qwen unreachable or bad response await say(room_id, f"āš ļø couldn't reach the local parser: {exc}", root) return if proposal["intent"] == "unclear": await say(room_id, UNCLEAR_HELP, root) return # Confirm new-vs-existing against the CRM matcher (read-only). Degrade gracefully if # the CRM is unreachable — still propose, just without the "looks like existing" hint. hint = "" try: match = await asyncio.to_thread(crm_client.match, proposal) if match: proposal["intent"] = "meeting_note" proposal["_match_id"] = match["id"] hint = f"\n\nšŸ”Ž Looks like an existing investor: **{match['name']}** — this will append a note to them." except Exception: pass store.put(root, proposal) await say(room_id, proposals.render(proposal) + hint, root) # Also drop a brief, un-threaded reply in the main timeline so the proposal isn't # easy to miss inside a thread (the full card + yes/edit/no stay in the thread). await nudge(room_id, proposals.summary_line(proposal), root) async def handle_reply(room_id, root, text): action, payload = proposals.interpret_reply(text) # Claim the proposal synchronously — BEFORE any await — so a second reply that # arrives while a commit is in flight can't double-process it. asyncio is # cooperative: nothing else runs between here and the first await below, so the # pop is atomic w.r.t. other Matrix events. proposal = store.pop(root) if proposal is None: return if action == "approve": try: summary = await asyncio.to_thread(crm_client.commit, proposal) except Exception as exc: store.put(root, proposal) # commit failed — restore so the user can retry await say(room_id, f"āš ļø write failed, nothing committed: {exc}", root) return await say(room_id, f"āœ… {summary}", root) elif action == "reject": await say(room_id, "šŸ—‘ļø Discarded — nothing written.", root) elif action == "edit": field, value = payload proposal = proposals.apply_edit(proposal, field, value) store.put(root, proposal) # keep it pending (edited) for the next reply await say(room_id, "āœļø Updated:\n\n" + proposals.render(proposal), root) else: # unrecognized reply — leave the proposal pending store.put(root, proposal) await say(room_id, "Reply **yes** to commit, **edit field=value**, or **no**.", root) async def on_message(room: MatrixRoom, event: RoomMessageText): if event.sender == mx["user_id"]: return # never react to our own messages (we post in-thread — this prevents loops) if room.room_id != intake_room: return text = (event.body or "").strip() if not text: return root = matrix_io.thread_root_of(event) if root and store.has(root): await handle_reply(room.room_id, root, text) elif root: return # threaded message not tied to a live proposal — ignore else: await handle_intake(room.room_id, event.event_id, text) # Prime the sync token past history, THEN register the callback — only react to messages # arriving after startup (no backlog replay). (matrix-bridge pattern.) print("matrix-intake: priming sync (skipping backlog)...", flush=True) await client.sync(timeout=30000, full_state=False) client.add_event_callback(on_message, RoomMessageText) who = await client.whoami() print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True) try: await client.sync_forever(timeout=30000) finally: await client.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass