536358093f
The intake bot now accepts a photo of a business card in the intake room and turns it into the same new-investor proposal a typed note would. The only new step is image -> text; everything downstream (parse, fuzzy match, in-thread approval, log-communication write) is reused unchanged. M3 was deferred only because Spark Control had no vision model. That blocker is gone: the daily-driver Qwen is vision-capable under the same model id, and the gateway forwards OpenAI multimodal content untouched, so no gateway/server/s9pk change is needed -- this ships bot-only (git pull + rebuild on the Spark). Transcribe-then-reuse (not vision-straight-to-JSON) is deliberate: the transcription becomes the source text the email-integrity rule checks against, so a mis-read address can't reach the CRM unapproved -- same guarantee as the text path. Card commits tag source="matrix_card" for the audit log. - llm.chat_vision: multimodal /v1/chat/completions, same model, same gateway - spark.transcribe_card: faithful card->text, "" on a non-card (NONE sentinel) - bot.on_image/handle_card: download image, transcribe, hand to handle_intake - crm_client: source provenance overridable via the proposal's _source key - tests: test_spark.py + a provenance case; 41/41 suite green
203 lines
9.4 KiB
Python
203 lines
9.4 KiB
Python
"""CRM API client for the intake bot's write-back leg.
|
|
|
|
The bot authenticates as a dedicated service user (Bearer JWT via /api/auth/login — the CRM
|
|
has no service-key path) and reuses the CRM's OWN canonical write endpoint
|
|
(/api/fundraising/log-communication) for both new-investor and existing-note cases, rather
|
|
than mutating the grid itself. That endpoint creates the grid row (create_investor_if_missing),
|
|
upserts the contact, logs the communication, appends a visible note, and re-syncs the
|
|
relational tables + audit — exactly as a UI grid edit would. We only tag provenance
|
|
(source="matrix_intake"). The payload builder is a pure function so it's unit-tested offline.
|
|
"""
|
|
import json
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.request
|
|
from urllib.parse import urlencode
|
|
|
|
import settings
|
|
|
|
_token = None
|
|
|
|
|
|
def _http(method, path, body=None, token=None):
|
|
s = settings.crm_settings()
|
|
url = s["base"] + path
|
|
data = json.dumps(body).encode("utf-8") if body is not None else None
|
|
headers = {"Content-Type": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
req = urllib.request.Request(url, data=data, method=method, headers=headers)
|
|
ctx = None
|
|
if url.lower().startswith("https") and not s["verify_tls"]:
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
|
|
raw = resp.read()
|
|
return resp.status, (json.loads(raw) if raw else {})
|
|
except urllib.error.HTTPError as exc:
|
|
raw = exc.read()
|
|
try:
|
|
payload = json.loads(raw) if raw else {}
|
|
except Exception:
|
|
payload = {"raw": raw.decode("utf-8", "replace")}
|
|
return exc.code, payload
|
|
|
|
|
|
def _login():
|
|
global _token
|
|
s = settings.crm_settings()
|
|
if not s["username"] or not s["password"]:
|
|
raise RuntimeError("CRM bot creds not set (CRM_BOT_USERNAME / CRM_BOT_PASSWORD)")
|
|
status, data = _http("POST", "/api/auth/login",
|
|
{"username": s["username"], "password": s["password"]})
|
|
if status != 200 or not data.get("token"):
|
|
raise RuntimeError(f"CRM login failed ({status}): {data.get('error') or data}")
|
|
_token = data["token"]
|
|
return _token
|
|
|
|
|
|
def _authed(method, path, body=None):
|
|
"""Call the CRM with the cached token; re-login once on a 401 (token expiry)."""
|
|
global _token
|
|
token = _token or _login()
|
|
status, data = _http(method, path, body, token=token)
|
|
if status == 401:
|
|
token = _login()
|
|
status, data = _http(method, path, body, token=token)
|
|
return status, data
|
|
|
|
|
|
def match(proposal):
|
|
"""Resolve new-vs-existing for this proposal against the CRM matcher.
|
|
|
|
Returns {'match': {...}|None, 'candidates': [...]}:
|
|
- `match` is a confident EXACT existing investor — {'id', 'name'} — that the bot
|
|
auto-attaches a note to (no human disambiguation needed).
|
|
- `candidates` is a ranked list of fuzzy NEAR-matches — each {'id', 'name', 'score',
|
|
'matched_on'} — surfaced in-thread for the human to pick from (or confirm "new")
|
|
when there is no exact match, so a typo'd/near-duplicate name doesn't silently
|
|
create a second investor."""
|
|
q = proposal.get("investor_name") or proposal.get("contact_name") or ""
|
|
email = proposal.get("contact_email") or ""
|
|
if not q and not email:
|
|
return {"match": None, "candidates": []}
|
|
qs = urlencode({"q": q, "email": email})
|
|
status, data = _authed("GET", f"/api/intake/match?{qs}")
|
|
if status != 200:
|
|
raise RuntimeError(f"intake match failed ({status}): {data.get('error') or data}")
|
|
payload = data.get("data") or {}
|
|
m = payload.get("match")
|
|
match_out = {"id": m["id"], "name": m.get("investor_name") or q} if m else None
|
|
candidates = [
|
|
{"id": c["id"], "name": c.get("investor_name") or "?",
|
|
"score": c.get("score"), "matched_on": c.get("matched_on")}
|
|
for c in (payload.get("candidates") or []) if c.get("id")
|
|
]
|
|
return {"match": match_out, "candidates": candidates}
|
|
|
|
|
|
def list_email_proposals():
|
|
"""Pull the email-activity review work-lists for the poll loop: {to_post, open, to_close}.
|
|
to_post = pending, un-posted (post a card); open = posted, awaiting a decision (rebuild the
|
|
reply-routing map after a restart); to_close = decided on the web (announce in-thread + close)."""
|
|
status, data = _authed("GET", "/api/intake/email-proposals")
|
|
if status != 200:
|
|
raise RuntimeError(f"email-proposals list failed ({status}): {data.get('error') or data}")
|
|
payload = data.get("data") or {}
|
|
return {k: (payload.get(k) or []) for k in ("to_post", "open", "to_close")}
|
|
|
|
|
|
def mark_email_proposal_posted(proposal_id, event_id):
|
|
"""Record the Matrix thread-root event id so the proposal's review state survives a restart."""
|
|
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/matrix",
|
|
{"event_id": event_id})
|
|
if status != 200:
|
|
raise RuntimeError(f"mark posted failed ({status}): {data.get('error') or data}")
|
|
return data.get("data") or {}
|
|
|
|
|
|
def mark_email_proposal_closed(proposal_id):
|
|
"""Mark the review thread resolved after announcing a web-side decision in it."""
|
|
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/matrix",
|
|
{"closed": True})
|
|
if status != 200:
|
|
raise RuntimeError(f"mark closed failed ({status}): {data.get('error') or data}")
|
|
return data.get("data") or {}
|
|
|
|
|
|
def decide_email_proposal(proposal_id, decision, note=None):
|
|
"""Relay an in-thread approve/dismiss (with the possibly-revised note) to the CRM. The server
|
|
appends the note to the grid on approve, tags source='matrix', and closes the thread."""
|
|
body = {"decision": decision}
|
|
if note is not None:
|
|
body["note"] = note
|
|
status, data = _authed("POST", f"/api/intake/email-proposals/{proposal_id}/decide", body)
|
|
if status not in (200, 201):
|
|
raise RuntimeError(f"email-proposal decide failed ({status}): {data.get('error') or data}")
|
|
return data.get("data") or {}
|
|
|
|
|
|
def nl_query(question):
|
|
"""Ask the read-only NL-query endpoint (POST /api/query/nl). Translation runs on the box's
|
|
LOCAL model — the question never leaves the box and no write is possible. Returns the
|
|
endpoint's structured result dict ({intent, slots, rows, summary, ...} or {error, detail});
|
|
the server returns that same body on a hit AND on the soft 503 (model down) / 500 (query
|
|
fault) status codes, so we hand it straight to the renderer. Any OTHER status — auth (403),
|
|
a malformed request (400), an unexpected shape — raises so the caller posts a brief error."""
|
|
status, data = _authed("POST", "/api/query/nl", {"question": question, "source": "matrix"})
|
|
if status not in (200, 500, 503):
|
|
raise RuntimeError(f"nl-query failed ({status}): {data.get('error') or data}")
|
|
return data.get("data") or {}
|
|
|
|
|
|
def build_commit_payload(proposal):
|
|
"""Pure: map a proposal to the /api/fundraising/log-communication request body.
|
|
|
|
Existing investor (carries _match_id) → target that exact grid row. Otherwise create the
|
|
investor if missing. The note becomes the communication body; the email is only sent when
|
|
it survived parse's source-text integrity check."""
|
|
contact = {
|
|
"name": proposal.get("contact_name") or proposal.get("investor_name") or "",
|
|
"email": proposal.get("contact_email") or "",
|
|
"title": proposal.get("contact_title") or "",
|
|
}
|
|
note = proposal.get("note") or ""
|
|
# The CRM's grid note line uses subject-or-body for its one-line summary, so a non-empty
|
|
# subject hides the actual note text. Send a blank subject when there's a note (let the note
|
|
# itself show in the grid); fall back to a provenance label only when there's nothing to
|
|
# show. Provenance is recorded via source="matrix_intake" either way.
|
|
intent_label = "Note (Matrix)" if proposal.get("intent") == "meeting_note" else "Intake (Matrix)"
|
|
payload = {
|
|
"contact": contact,
|
|
"type": "note",
|
|
"body": note,
|
|
"subject": "" if note.strip() else intent_label,
|
|
"append_note": True,
|
|
# Provenance for the audit log: a typed note is "matrix_intake"; a scanned business card
|
|
# rides in on _source="matrix_card" (set by the bot's image handler). Default preserves
|
|
# the text path.
|
|
"source": proposal.get("_source") or "matrix_intake",
|
|
}
|
|
match_id = proposal.get("_match_id")
|
|
if match_id:
|
|
payload["row_id"] = match_id
|
|
else:
|
|
payload["investor_name"] = proposal.get("investor_name") or proposal.get("contact_name") or ""
|
|
payload["create_investor_if_missing"] = True
|
|
return payload
|
|
|
|
|
|
def commit(proposal):
|
|
"""Write the approved proposal to the CRM; return a short human summary for the thread."""
|
|
payload = build_commit_payload(proposal)
|
|
status, data = _authed("POST", "/api/fundraising/log-communication", payload)
|
|
if status not in (200, 201):
|
|
raise RuntimeError(f"log-communication failed ({status}): {data.get('error') or data}")
|
|
row = (data.get("data") or {}).get("row") or {}
|
|
name = row.get("investor_name") or payload.get("investor_name") or "investor"
|
|
if proposal.get("_match_id"):
|
|
return f"Logged a note on **{name}** (existing grid entry)."
|
|
return f"Created a new grid entry for **{name}**" + (" and logged a note." if payload.get("body") else ".")
|