Files
ten31-database/backend/matrix_intake/test_crm_client.py
T
Keysat 7ad0ee7624 Add Matrix intake bot (M1+M2): typed message → approved fundraising-grid write
New backend/matrix_intake/ runs as its own process (matrix-nio isolated from the
stdlib CRM): local-Qwen parse via Spark Control → in-thread human approval
(yes/edit/no) → write through the CRM's own log-communication endpoint, tagged
source=matrix_intake. Adds read-only GET /api/intake/match (returns grid row id,
no-duplicate contract); threads provenance through handle_log_fundraising_communication.
Reviewer-passed: pop-before-commit closes a double-approve race; edit-grammar fix.
Text-only v1; business-card photo (M3) deferred (no Spark vision model).
26/26 tests green; live Matrix smoke pending deploy.
2026-06-17 07:51:27 -05:00

55 lines
2.0 KiB
Python

"""Tests for the CRM client's payload builder (pure logic, no network)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import crm_client # noqa: E402
def test_new_investor_payload():
p = {"intent": "new_investor", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": "jane@acme.com",
"contact_title": "GP", "note": "met at conf"}
out = crm_client.build_commit_payload(p)
assert out["investor_name"] == "Acme Capital"
assert out["create_investor_if_missing"] is True
assert "row_id" not in out
assert out["contact"] == {"name": "Jane Doe", "email": "jane@acme.com", "title": "GP"}
assert out["body"] == "met at conf"
assert out["source"] == "matrix_intake"
def test_existing_investor_uses_row_id_not_create():
p = {"intent": "meeting_note", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": None, "note": "wants Q3 deck",
"_match_id": "rowAcme"}
out = crm_client.build_commit_payload(p)
assert out["row_id"] == "rowAcme"
assert "create_investor_if_missing" not in out
assert "investor_name" not in out # targeted by row id, never re-matched by name
assert out["body"] == "wants Q3 deck"
def test_contact_falls_back_to_investor_name_when_no_person():
p = {"intent": "new_investor", "investor_name": "Delta Fund",
"contact_name": None, "contact_email": None, "note": None}
out = crm_client.build_commit_payload(p)
assert out["contact"]["name"] == "Delta Fund"
assert out["body"] == ""
def test_no_email_sends_empty_string_not_none():
p = {"intent": "new_investor", "investor_name": "Gamma", "contact_name": "Bob",
"contact_email": None, "note": "x"}
out = crm_client.build_commit_payload(p)
assert out["contact"]["email"] == ""
if __name__ == "__main__":
fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
for fn in fns:
fn()
print(f"ok {fn.__name__}")
print(f"\n{len(fns)} passed")