Add business-card photo intake to the Matrix bot (M3)

The intake bot now accepts a photo of a business card in the intake room and
turns it into the same new-investor proposal a typed note would. The only new
step is image -> text; everything downstream (parse, fuzzy match, in-thread
approval, log-communication write) is reused unchanged.

M3 was deferred only because Spark Control had no vision model. That blocker is
gone: the daily-driver Qwen is vision-capable under the same model id, and the
gateway forwards OpenAI multimodal content untouched, so no gateway/server/s9pk
change is needed -- this ships bot-only (git pull + rebuild on the Spark).

Transcribe-then-reuse (not vision-straight-to-JSON) is deliberate: the
transcription becomes the source text the email-integrity rule checks against,
so a mis-read address can't reach the CRM unapproved -- same guarantee as the
text path. Card commits tag source="matrix_card" for the audit log.

- llm.chat_vision: multimodal /v1/chat/completions, same model, same gateway
- spark.transcribe_card: faithful card->text, "" on a non-card (NONE sentinel)
- bot.on_image/handle_card: download image, transcribe, hand to handle_intake
- crm_client: source provenance overridable via the proposal's _source key
- tests: test_spark.py + a provenance case; 41/41 suite green
This commit is contained in:
Keysat
2026-06-20 10:26:27 -05:00
parent be40520c3d
commit 536358093f
7 changed files with 209 additions and 6 deletions
+54 -3
View File
@@ -10,8 +10,9 @@ Runs as its own process (its matrix-nio dep is isolated here, never in the CRM r
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
"""
import asyncio
import base64
from nio import AsyncClient, MatrixRoom, MessageDirection, RoomMessageText
from nio import AsyncClient, MatrixRoom, MessageDirection, RoomMessageImage, RoomMessageText
import crm_client
import email_proposals
@@ -20,6 +21,7 @@ import parse
import proposals
import query
import settings
import spark
UNCLEAR_HELP = (
"🤔 I couldn't tell what to record. Try e.g.\n"
@@ -46,7 +48,10 @@ async def main():
query_room = settings.query_room() # dedicated read-only Q&A room (empty → use the intake trigger)
email_threads = {} # Matrix thread-root event_id -> {id, investor_name, note} for an email proposal
async def handle_intake(room_id, root, text):
async def handle_intake(room_id, root, text, source="matrix_intake"):
# `source` tags provenance for the eventual commit: "matrix_intake" for a typed note,
# "matrix_card" when the text came from a scanned business card (on_image). Everything
# else about the flow is identical — that's the whole point of transcribe-then-reuse.
# A bare yes/no/approve typed in the MAIN timeline (not inside a proposal's thread) is
# an easy slip — point the user back to the thread rather than parse it as a new intake.
action, _ = proposals.interpret_reply(text)
@@ -62,6 +67,7 @@ async def main():
if proposal["intent"] == "unclear":
await say(room_id, UNCLEAR_HELP, root)
return
proposal["_source"] = source # rides through to commit (control key, survives dict() copies)
# Resolve new-vs-existing against the CRM matcher (read-only). Degrade gracefully if the
# CRM is unreachable — still propose as new, just without match/candidate hints.
match, candidates = None, []
@@ -99,6 +105,39 @@ async def main():
# easy to miss inside a thread (the full card + yes/edit/no stay in the thread).
await nudge(room_id, proposals.summary_line(proposal), root)
async def handle_card(room_id, event):
"""A photo in the intake room → transcribe the business card on the local VL model, then
hand the transcription to the SAME intake flow as a typed note (parse → match → approve).
The only new step is image → text; everything downstream is reused. The transcription is
also the source text the email-integrity check runs against, so a mis-read address can't
slip in unapproved."""
mxc = getattr(event, "url", None)
if not mxc:
# Unencrypted images carry a plain mxc:// url; an encrypted room delivers a different
# event class entirely (we don't register for it), so this only guards the odd case.
await say(room_id, "📇 I can only read unencrypted images right now.", event.event_id)
return
await say(room_id, "📇 Reading the card…", event.event_id) # vision is slower — ack first
try:
resp = await client.download(mxc=mxc)
data = getattr(resp, "body", None)
if not isinstance(data, (bytes, bytearray)): # a DownloadError carries no bytes
raise RuntimeError(getattr(resp, "message", None) or "image download failed")
mime = getattr(resp, "content_type", None) or "image/jpeg"
b64 = base64.b64encode(data).decode("ascii")
text = await asyncio.to_thread(spark.transcribe_card, b64, mime)
except Exception as exc:
await say(room_id, f"⚠️ couldn't read the card: {str(exc)[:200]}", event.event_id)
return
if len(text.strip()) < 5:
await say(room_id, "📇 I couldn't read any text on that card — try a clearer, "
"well-lit photo taken straight-on.", event.event_id)
return
# Frame the raw transcription so the existing extractor reads it as a new-investor intake;
# the transcription itself is what email-integrity is checked against.
framed = "New investor — from a business card:\n" + text.strip()
await handle_intake(room_id, event.event_id, framed, source="matrix_card")
async def handle_query(room_id, root, question):
"""A read-only NL question ('@bot …' / '?…') — translate + run it on the BOX (local Qwen,
nothing leaves the box) and post the answer in a thread. No write path, no approval gate:
@@ -337,11 +376,23 @@ async def main():
else:
await handle_query(room.room_id, event.event_id, q)
# Prime the sync token past history, THEN register the callback — only react to messages
async def on_image(room: MatrixRoom, event: RoomMessageImage):
# Business-card capture is intake-only: ignore our own uploads, images in the Q&A /
# email-review rooms, and an image dropped inside an existing thread (not a fresh card).
if event.sender == mx["user_id"]:
return
if room.room_id != intake_room:
return
if matrix_io.thread_root_of(event):
return
await handle_card(room.room_id, event)
# Prime the sync token past history, THEN register the callbacks — only react to messages
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
await client.sync(timeout=30000, full_state=False)
client.add_event_callback(on_message, RoomMessageText)
client.add_event_callback(on_image, RoomMessageImage)
who = await client.whoami()
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
tasks = [asyncio.create_task(client.sync_forever(timeout=30000))]