Add Matrix intake bot (M1+M2): typed message → approved fundraising-grid write

New backend/matrix_intake/ runs as its own process (matrix-nio isolated from the
stdlib CRM): local-Qwen parse via Spark Control → in-thread human approval
(yes/edit/no) → write through the CRM's own log-communication endpoint, tagged
source=matrix_intake. Adds read-only GET /api/intake/match (returns grid row id,
no-duplicate contract); threads provenance through handle_log_fundraising_communication.
Reviewer-passed: pop-before-commit closes a double-approve race; edit-grammar fix.
Text-only v1; business-card photo (M3) deferred (no Spark vision model).
26/26 tests green; live Matrix smoke pending deploy.
This commit is contained in:
Keysat
2026-06-17 07:51:27 -05:00
parent 172c76553b
commit 7ad0ee7624
20 changed files with 1169 additions and 7 deletions
+41
View File
@@ -0,0 +1,41 @@
# Matrix intake bot
Turns a typed message in a dedicated Matrix room into a proposed fundraising-grid add/edit,
gated on in-thread human approval before any write. Runs as its own process (on the Spark),
separate from the CRM. Full design + rules: `docs/guides/matrix-intake.md`.
## Run
```bash
# 1. Install the one third-party dep (isolated to this component — NOT the CRM runtime)
python3 -m pip install -r requirements.txt # matrix-nio
# 2. Fill the MATRIX_* and CRM_BOT_* vars in the repo .env (see ../../.env.example),
# and create a dedicated CRM user for CRM_BOT_USERNAME/PASSWORD (admin → invite user).
# 3. Start the listener
python3 bot.py
```
It primes the Matrix sync past history (no backlog replay), then listens. Post a message in
the intake room; it replies in a thread with the parsed proposal. Reply **yes** to commit,
**edit field=value** to change a field, or **no** to discard.
## Layout
- `bot.py` — entrypoint: connect, prime-then-listen, dispatch (lifts matrix-bridge's plumbing).
- `parse.py` — message → structured proposal via local Qwen (`spark.py``backend/ingest/llm.py`).
- `proposals.py` — in-memory pending-proposal store + the yes/edit/no state machine.
- `crm_client.py` — login + `GET /api/intake/match` + write via `POST /api/fundraising/log-communication`.
- `matrix_io.py` — message splitting, thread-root detection, threaded-reply sender.
- `settings.py` — Matrix + CRM-API config (named `settings`, not `config`, to avoid shadowing `ingest/config`).
## Test (offline)
```bash
python3 test_parse.py && python3 test_proposals.py && python3 test_crm_client.py
# endpoint + create→match contract (boots the real server against a temp DB):
cd ../ && python3 test_intake_endpoints.py
```
Live Matrix behavior needs creds + `matrix-nio` and can only be smoke-tested on the Spark.
+7
View File
@@ -0,0 +1,7 @@
"""Matrix intake bot — a dedicated Matrix room that turns a typed message into a
proposed fundraising-grid add/edit, gated on in-thread human approval before any write.
Separate process from the CRM (its only third-party dep, matrix-nio, lives here, never
in the stdlib CRM runtime). Parses with local Qwen via Spark Control; on approval, writes
through the CRM's own API. See docs/guides/matrix-intake.md and ROADMAP.md.
"""
+121
View File
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""Matrix intake bot — entrypoint.
A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control)
into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member
replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does
the bot write, through the CRM's own API. Nothing is ever written autonomously.
Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime).
Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env.
"""
import asyncio
from nio import AsyncClient, MatrixRoom, RoomMessageText
import crm_client
import matrix_io
import parse
import proposals
import settings
UNCLEAR_HELP = (
"🤔 I couldn't tell what to record. Try e.g.\n"
"`New investor: Acme Capital — Jane Doe <jane@acme.com>, met at the Austin conf`\n"
"or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`."
)
async def main():
mx = settings.matrix_settings()
client = AsyncClient(mx["homeserver"], mx["user_id"])
client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"])
say = matrix_io.make_say(client)
store = proposals.ProposalStore()
intake_room = mx["intake_room"]
async def handle_intake(room_id, root, text):
try:
proposal = await asyncio.to_thread(parse.parse_message, text)
except Exception as exc: # Spark/Qwen unreachable or bad response
await say(room_id, f"⚠️ couldn't reach the local parser: {exc}", root)
return
if proposal["intent"] == "unclear":
await say(room_id, UNCLEAR_HELP, root)
return
# Confirm new-vs-existing against the CRM matcher (read-only). Degrade gracefully if
# the CRM is unreachable — still propose, just without the "looks like existing" hint.
hint = ""
try:
match = await asyncio.to_thread(crm_client.match, proposal)
if match:
proposal["intent"] = "meeting_note"
proposal["_match_id"] = match["id"]
hint = f"\n\n🔎 Looks like an existing investor: **{match['name']}** — this will append a note to them."
except Exception:
pass
store.put(root, proposal)
await say(room_id, proposals.render(proposal) + hint, root)
async def handle_reply(room_id, root, text):
action, payload = proposals.interpret_reply(text)
# Claim the proposal synchronously — BEFORE any await — so a second reply that
# arrives while a commit is in flight can't double-process it. asyncio is
# cooperative: nothing else runs between here and the first await below, so the
# pop is atomic w.r.t. other Matrix events.
proposal = store.pop(root)
if proposal is None:
return
if action == "approve":
try:
summary = await asyncio.to_thread(crm_client.commit, proposal)
except Exception as exc:
store.put(root, proposal) # commit failed — restore so the user can retry
await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root)
return
await say(room_id, f"{summary}", root)
elif action == "reject":
await say(room_id, "🗑️ Discarded — nothing written.", root)
elif action == "edit":
field, value = payload
proposal = proposals.apply_edit(proposal, field, value)
store.put(root, proposal) # keep it pending (edited) for the next reply
await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root)
else: # unrecognized reply — leave the proposal pending
store.put(root, proposal)
await say(room_id, "Reply **yes** to commit, **edit field=value**, or **no**.", root)
async def on_message(room: MatrixRoom, event: RoomMessageText):
if event.sender == mx["user_id"]:
return # never react to our own messages (we post in-thread — this prevents loops)
if room.room_id != intake_room:
return
text = (event.body or "").strip()
if not text:
return
root = matrix_io.thread_root_of(event)
if root and store.has(root):
await handle_reply(room.room_id, root, text)
elif root:
return # threaded message not tied to a live proposal — ignore
else:
await handle_intake(room.room_id, event.event_id, text)
# Prime the sync token past history, THEN register the callback — only react to messages
# arriving after startup (no backlog replay). (matrix-bridge pattern.)
print("matrix-intake: priming sync (skipping backlog)...", flush=True)
await client.sync(timeout=30000, full_state=False)
client.add_event_callback(on_message, RoomMessageText)
who = await client.whoami()
print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True)
try:
await client.sync_forever(timeout=30000)
finally:
await client.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
+127
View File
@@ -0,0 +1,127 @@
"""CRM API client for the intake bot's write-back leg.
The bot authenticates as a dedicated service user (Bearer JWT via /api/auth/login — the CRM
has no service-key path) and reuses the CRM's OWN canonical write endpoint
(/api/fundraising/log-communication) for both new-investor and existing-note cases, rather
than mutating the grid itself. That endpoint creates the grid row (create_investor_if_missing),
upserts the contact, logs the communication, appends a visible note, and re-syncs the
relational tables + audit — exactly as a UI grid edit would. We only tag provenance
(source="matrix_intake"). The payload builder is a pure function so it's unit-tested offline.
"""
import json
import ssl
import urllib.error
import urllib.request
from urllib.parse import urlencode
import settings
_token = None
def _http(method, path, body=None, token=None):
s = settings.crm_settings()
url = s["base"] + path
data = json.dumps(body).encode("utf-8") if body is not None else None
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url, data=data, method=method, headers=headers)
ctx = None
if url.lower().startswith("https") and not s["verify_tls"]:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
raw = resp.read()
return resp.status, (json.loads(raw) if raw else {})
except urllib.error.HTTPError as exc:
raw = exc.read()
try:
payload = json.loads(raw) if raw else {}
except Exception:
payload = {"raw": raw.decode("utf-8", "replace")}
return exc.code, payload
def _login():
global _token
s = settings.crm_settings()
if not s["username"] or not s["password"]:
raise RuntimeError("CRM bot creds not set (CRM_BOT_USERNAME / CRM_BOT_PASSWORD)")
status, data = _http("POST", "/api/auth/login",
{"username": s["username"], "password": s["password"]})
if status != 200 or not data.get("token"):
raise RuntimeError(f"CRM login failed ({status}): {data.get('error') or data}")
_token = data["token"]
return _token
def _authed(method, path, body=None):
"""Call the CRM with the cached token; re-login once on a 401 (token expiry)."""
global _token
token = _token or _login()
status, data = _http(method, path, body, token=token)
if status == 401:
token = _login()
status, data = _http(method, path, body, token=token)
return status, data
def match(proposal):
"""Return {'id', 'name'} for an existing investor matching this proposal, else None."""
q = proposal.get("investor_name") or proposal.get("contact_name") or ""
email = proposal.get("contact_email") or ""
if not q and not email:
return None
qs = urlencode({"q": q, "email": email})
status, data = _authed("GET", f"/api/intake/match?{qs}")
if status != 200:
raise RuntimeError(f"intake match failed ({status}): {data.get('error') or data}")
m = (data.get("data") or {}).get("match")
if not m:
return None
return {"id": m["id"], "name": m.get("investor_name") or q}
def build_commit_payload(proposal):
"""Pure: map a proposal to the /api/fundraising/log-communication request body.
Existing investor (carries _match_id) → target that exact grid row. Otherwise create the
investor if missing. The note becomes the communication body; the email is only sent when
it survived parse's source-text integrity check."""
contact = {
"name": proposal.get("contact_name") or proposal.get("investor_name") or "",
"email": proposal.get("contact_email") or "",
"title": proposal.get("contact_title") or "",
}
note = proposal.get("note") or ""
payload = {
"contact": contact,
"type": "note",
"body": note,
"subject": "Intake (Matrix)" if proposal.get("intent") != "meeting_note" else "Note (Matrix)",
"append_note": True,
"source": "matrix_intake",
}
match_id = proposal.get("_match_id")
if match_id:
payload["row_id"] = match_id
else:
payload["investor_name"] = proposal.get("investor_name") or proposal.get("contact_name") or ""
payload["create_investor_if_missing"] = True
return payload
def commit(proposal):
"""Write the approved proposal to the CRM; return a short human summary for the thread."""
payload = build_commit_payload(proposal)
status, data = _authed("POST", "/api/fundraising/log-communication", payload)
if status not in (200, 201):
raise RuntimeError(f"log-communication failed ({status}): {data.get('error') or data}")
row = (data.get("data") or {}).get("row") or {}
name = row.get("investor_name") or payload.get("investor_name") or "investor"
if proposal.get("_match_id"):
return f"Logged note to **{name}**."
return f"Added **{name}** to the grid" + (" with a note." if payload.get("body") else ".")
+55
View File
@@ -0,0 +1,55 @@
"""Matrix plumbing lifted from matrix-bridge (src/bot.py): message splitting, thread-root
detection, and a threaded-reply sender. Kept dependency-light so the rest of the bot is
testable without a live homeserver."""
MAX_MSG_CHARS = 30000 # well under Matrix's ~64KB event cap
def split_message(text, limit=MAX_MSG_CHARS):
"""Split text into <=limit-char chunks on newline boundaries (no truncation)."""
if len(text) <= limit:
return [text]
chunks, buf = [], ""
for line in text.splitlines(keepends=True):
while len(line) > limit:
if buf:
chunks.append(buf)
buf = ""
chunks.append(line[:limit])
line = line[limit:]
if len(buf) + len(line) > limit:
chunks.append(buf)
buf = ""
buf += line
if buf:
chunks.append(buf)
return chunks
def thread_root_of(event):
"""Return the thread root event_id if this message is a threaded reply, else None."""
relates = (getattr(event, "source", None) or {}).get("content", {}).get("m.relates_to") or {}
if relates.get("rel_type") == "m.thread":
return relates.get("event_id")
return None
def thread_content(text, thread_root):
"""Build an m.room.message content dict, threaded under thread_root when given."""
content = {"msgtype": "m.text", "body": text}
if thread_root:
content["m.relates_to"] = {
"rel_type": "m.thread",
"event_id": thread_root,
"is_falling_back": True,
"m.in_reply_to": {"event_id": thread_root},
}
return content
def make_say(client):
"""Return an async say(room_id, text, thread_root=None) bound to a matrix-nio client."""
async def say(room_id, text, thread_root=None):
for chunk in split_message(text):
await client.room_send(room_id, "m.room.message", thread_content(chunk, thread_root))
return say
+63
View File
@@ -0,0 +1,63 @@
"""Turn a free-text intake message into a normalized proposal via local Qwen.
The model only EXTRACTS structure; it never decides to write anything. New-vs-existing is
finalized in M2 against the CRM matcher — here `intent` is the model's first read.
"""
import re
import spark
SYSTEM = (
"You extract structured investor-intake data from a short message a venture-fund "
"team member typed. Reply with ONLY a JSON object, no prose, with these keys:\n"
' "intent": "new_investor" if the message introduces a new investor or prospect, '
'"meeting_note" if it logs a note/update about an investor, else "unclear".\n'
' "investor_name": the investing firm or entity name (e.g. "Acme Capital"), or null.\n'
' "contact_name": the individual person mentioned, or null.\n'
' "contact_email": the person\'s email if explicitly present, else null. Never invent one.\n'
' "contact_title": the person\'s role/title if stated, else null.\n'
' "note": any meeting note, context, or next step, else null.\n'
"Use null (not empty string) for anything not present. Output JSON only."
)
_EMAIL_RE = re.compile(r"[^@\s]+@[^@\s]+\.[^@\s]+")
_VALID_INTENTS = {"new_investor", "meeting_note", "unclear"}
_FIELDS = ("intent", "investor_name", "contact_name", "contact_email", "contact_title", "note")
def _clean(v):
if v is None:
return None
s = str(v).strip()
if not s or s.lower() in ("null", "none", "n/a", "na", "unknown"):
return None
return s
def normalize(raw, source_text=""):
"""Coerce the model's dict into a stable proposal shape; salvage an email from the
source text if the model missed one. Returns a dict with all _FIELDS keys."""
raw = raw or {}
out = {k: _clean(raw.get(k)) for k in _FIELDS}
intent = (out["intent"] or "").lower().replace("-", "_").replace(" ", "_")
out["intent"] = intent if intent in _VALID_INTENTS else "unclear"
# Email integrity: only accept an address that literally appears in the source message.
# The model is unreliable for verbatim strings and must never mint an address — anything
# not present in what the human typed is dropped (a wrong email in the CRM is worse than
# none). This both salvages a missed address and rejects a hallucinated one.
m = _EMAIL_RE.search(source_text or "")
out["contact_email"] = m.group(0).rstrip(".,;:!?)]}>\"'") if m else None
# An intake with no firm AND no person is not actionable.
if not out["investor_name"] and not out["contact_name"]:
out["intent"] = "unclear"
return out
def parse_message(text, parse_fn=spark.parse_json):
"""Parse one intake message. `parse_fn` is injectable for tests (defaults to Spark/Qwen).
Returns a normalized proposal dict. On a model/transport failure, raises (caller decides)."""
raw = parse_fn(text, system=SYSTEM, max_tokens=400)
return normalize(raw, source_text=text)
+103
View File
@@ -0,0 +1,103 @@
"""Pending-proposal store + the in-thread approval state machine.
The one piece of state in the bot: a proposal awaiting a human's yes/edit/no, keyed by the
Matrix thread root (the bot's proposal lives in a thread rooted at the user's message, and
the user replies inside that thread). In-memory and ephemeral by design — a restart drops
pending proposals (the user just re-sends), matching matrix-bridge's stateless-by-default
ethos. Nothing here writes to the CRM; the bot calls the CRM client only after `approve`.
"""
# field aliases accepted in `edit <field>=<value>`
_EDIT_ALIASES = {
"name": "investor_name", "investor": "investor_name", "firm": "investor_name", "org": "investor_name",
"contact": "contact_name", "person": "contact_name",
"email": "contact_email",
"title": "contact_title", "role": "contact_title",
"note": "note",
}
_YES = {"yes", "y", "approve", "approved", "ok", "confirm", "go", "👍", ""}
_NO = {"no", "n", "cancel", "discard", "reject", "stop", "👎", ""}
class ProposalStore:
def __init__(self):
self._pending = {} # thread_root -> proposal dict
def put(self, thread_root, proposal):
self._pending[thread_root] = proposal
def get(self, thread_root):
return self._pending.get(thread_root)
def pop(self, thread_root):
return self._pending.pop(thread_root, None)
def has(self, thread_root):
return thread_root in self._pending
def _parse_edit(text):
"""Parse 'edit field=value' (also 'field: value'); return (canonical_field, value) or None."""
body = text.strip()
if body.lower().startswith("edit "):
body = body[5:].strip()
for sep in ("=", ":"):
if sep in body:
field, value = body.split(sep, 1)
field = field.strip().lower()
canon = _EDIT_ALIASES.get(field)
value = value.strip()
if canon and value:
return canon, value
# Not a known field on this separator — try the next one rather than bail,
# so e.g. "note: see deck=v2" still parses (split on ':' not the inner '=').
continue
return None
def interpret_reply(text):
"""Classify a threaded reply to a pending proposal.
Returns one of:
("approve", None) | ("reject", None) | ("edit", (field, value)) | ("unknown", None)
"""
t = (text or "").strip()
low = t.lower()
if low in _YES:
return ("approve", None)
if low in _NO:
return ("reject", None)
edit = _parse_edit(t)
if edit:
return ("edit", edit)
return ("unknown", None)
def apply_edit(proposal, field, value):
"""Return a copy of the proposal with one field changed."""
updated = dict(proposal)
updated[field] = value
return updated
def render(proposal):
"""Render a proposal as the in-thread message a human approves."""
if proposal.get("intent") == "meeting_note":
head = f"📝 Proposed **meeting note** for **{proposal.get('investor_name') or proposal.get('contact_name') or '?'}**"
else:
head = f"📇 Proposed **new investor**: **{proposal.get('investor_name') or proposal.get('contact_name') or '?'}**"
lines = [head]
fields = [
("Investor", proposal.get("investor_name")),
("Contact", proposal.get("contact_name")),
("Email", proposal.get("contact_email")),
("Title", proposal.get("contact_title")),
("Note", proposal.get("note")),
]
for label, val in fields:
if val:
lines.append(f"· {label}: {val}")
lines.append("")
lines.append("Reply **yes** to commit, **edit field=value** to change a field, or **no** to discard.")
return "\n".join(lines)
+4
View File
@@ -0,0 +1,4 @@
# Matrix intake bot — isolated to this component's own process. matrix-nio is the ONLY
# third-party runtime dep and MUST NOT be added to the stdlib CRM (backend/server.py).
# The Spark/Qwen + CRM-API calls reuse the repo's stdlib HTTP client (backend/ingest/http_util).
matrix-nio>=0.24
+56
View File
@@ -0,0 +1,56 @@
"""Config for the Matrix intake bot — Matrix creds + the dedicated intake room.
Spark settings (SPARK_CONTROL_URL, CHAT_MODEL, …) are NOT read here; they come from the
reused ingest client (see spark.py), which loads the same repo .env. This module only owns
the Matrix connection and the CRM API target for the write-back leg (M2).
"""
import os
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def load_env(path=None):
"""Populate os.environ from the repo .env (setdefault — never clobber a real env var)."""
path = path or os.path.join(REPO_ROOT, ".env")
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_env()
def _require(name):
val = os.environ.get(name, "").strip()
if not val:
raise RuntimeError(f"matrix_intake: required env var {name} is not set (see .env.example)")
return val
# Matrix connection (resolved lazily so importing this module for tests never requires creds).
def matrix_settings():
return {
"homeserver": _require("MATRIX_HOMESERVER"),
"user_id": _require("MATRIX_USER"),
"token": _require("MATRIX_ACCESS_TOKEN"),
"device_id": os.environ.get("MATRIX_DEVICE_ID", "ten31-intake-bot"),
"intake_room": _require("MATRIX_INTAKE_ROOM"),
}
# CRM API target for the write-back leg (M2). The CRM has no service-key auth path — auth is
# Bearer-JWT via /api/auth/login — so the bot logs in as a DEDICATED service user (a normal
# CRM user, created by an admin) and reuses the existing auth. Creds live in .env, never code.
def crm_settings():
return {
"base": os.environ.get("CRM_API_BASE", "http://127.0.0.1:8080").rstrip("/"),
"username": os.environ.get("CRM_BOT_USERNAME", "").strip(),
"password": os.environ.get("CRM_BOT_PASSWORD", ""),
"verify_tls": os.environ.get("CRM_API_VERIFY_TLS", "true").lower() in ("1", "true", "yes", "on"),
}
+21
View File
@@ -0,0 +1,21 @@
"""Thin reuse of the in-repo local-Qwen client (backend/ingest/llm.py) via Spark Control.
We import the ingest client rather than re-implementing the HTTP call so the intake bot
speaks the exact same Spark contract (model, /v1/chat/completions, TLS verify, .env load).
The intake message is real LP substance, but it goes ONLY to the local Qwen on Ten31 infra
— never Claude — so no scrub boundary applies (same basis as the daily digest). Never call a
Spark directly; everything goes through SPARK_CONTROL_URL.
"""
import os
import sys
_INGEST = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest")
if _INGEST not in sys.path:
sys.path.insert(0, _INGEST)
import llm # noqa: E402 (backend/ingest/llm.py — chat / chat_json over Spark Control)
def parse_json(prompt, system=None, max_tokens=400):
"""Send to local Qwen (temp 0, thinking off) and parse the first JSON object, or None."""
return llm.chat_json(prompt, system=system, max_tokens=max_tokens)
+54
View File
@@ -0,0 +1,54 @@
"""Tests for the CRM client's payload builder (pure logic, no network)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import crm_client # noqa: E402
def test_new_investor_payload():
p = {"intent": "new_investor", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": "jane@acme.com",
"contact_title": "GP", "note": "met at conf"}
out = crm_client.build_commit_payload(p)
assert out["investor_name"] == "Acme Capital"
assert out["create_investor_if_missing"] is True
assert "row_id" not in out
assert out["contact"] == {"name": "Jane Doe", "email": "jane@acme.com", "title": "GP"}
assert out["body"] == "met at conf"
assert out["source"] == "matrix_intake"
def test_existing_investor_uses_row_id_not_create():
p = {"intent": "meeting_note", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": None, "note": "wants Q3 deck",
"_match_id": "rowAcme"}
out = crm_client.build_commit_payload(p)
assert out["row_id"] == "rowAcme"
assert "create_investor_if_missing" not in out
assert "investor_name" not in out # targeted by row id, never re-matched by name
assert out["body"] == "wants Q3 deck"
def test_contact_falls_back_to_investor_name_when_no_person():
p = {"intent": "new_investor", "investor_name": "Delta Fund",
"contact_name": None, "contact_email": None, "note": None}
out = crm_client.build_commit_payload(p)
assert out["contact"]["name"] == "Delta Fund"
assert out["body"] == ""
def test_no_email_sends_empty_string_not_none():
p = {"intent": "new_investor", "investor_name": "Gamma", "contact_name": "Bob",
"contact_email": None, "note": "x"}
out = crm_client.build_commit_payload(p)
assert out["contact"]["email"] == ""
if __name__ == "__main__":
fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
for fn in fns:
fn()
print(f"ok {fn.__name__}")
print(f"\n{len(fns)} passed")
+93
View File
@@ -0,0 +1,93 @@
"""Tests for the intake parse/normalize layer — Spark/Qwen stubbed (no network)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import parse # noqa: E402
def _stub(reply):
"""Return a parse_fn that ignores input and yields `reply` (simulating Qwen's JSON)."""
return lambda text, system=None, max_tokens=400: reply
def test_new_investor_basic():
p = parse.parse_message(
"New investor Acme Capital, contact Jane Doe jane@acme.com, met at the Austin conf",
parse_fn=_stub({"intent": "new_investor", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": "jane@acme.com",
"contact_title": None, "note": "met at the Austin conf"}),
)
assert p["intent"] == "new_investor"
assert p["investor_name"] == "Acme Capital"
assert p["contact_email"] == "jane@acme.com"
def test_email_salvaged_from_source_when_model_misses():
p = parse.parse_message(
"add bob@example.org from Beta LP",
parse_fn=_stub({"intent": "new_investor", "investor_name": "Beta LP",
"contact_name": "Bob", "contact_email": None}),
)
assert p["contact_email"] == "bob@example.org"
def test_fabricated_email_dropped_when_not_in_source():
p = parse.parse_message(
"new prospect Gamma Partners, talked to their GP",
parse_fn=_stub({"intent": "new_investor", "investor_name": "Gamma Partners",
"contact_name": "their GP", "contact_email": "made-up@nowhere.test"}),
)
# the model invented an address that isn't in the source → must be dropped
assert p["contact_email"] is None
def test_meeting_note_intent_preserved():
p = parse.parse_message(
"Note for Acme Capital: wants the Q3 deck",
parse_fn=_stub({"intent": "meeting_note", "investor_name": "Acme Capital",
"note": "wants the Q3 deck"}),
)
assert p["intent"] == "meeting_note"
assert p["note"] == "wants the Q3 deck"
def test_unclear_when_no_entity():
p = parse.parse_message(
"hey what's up",
parse_fn=_stub({"intent": "new_investor", "investor_name": None, "contact_name": None}),
)
assert p["intent"] == "unclear"
def test_null_strings_normalized():
p = parse.parse_message(
"Delta Fund",
parse_fn=_stub({"intent": "new_investor", "investor_name": "Delta Fund",
"contact_name": "null", "contact_email": "N/A", "note": ""}),
)
assert p["contact_name"] is None
assert p["contact_email"] is None
assert p["note"] is None
def test_bad_intent_falls_back_to_unclear():
p = parse.parse_message(
"Epsilon Capital",
parse_fn=_stub({"intent": "garbage", "investor_name": "Epsilon Capital"}),
)
assert p["intent"] == "unclear"
def test_none_model_reply_is_unclear():
p = parse.parse_message("???", parse_fn=_stub(None))
assert p["intent"] == "unclear"
if __name__ == "__main__":
fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
for fn in fns:
fn()
print(f"ok {fn.__name__}")
print(f"\n{len(fns)} passed")
+95
View File
@@ -0,0 +1,95 @@
"""Tests for the proposal store + approval state machine (pure logic, no network)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import proposals # noqa: E402
SAMPLE = {"intent": "new_investor", "investor_name": "Acme Capital",
"contact_name": "Jane Doe", "contact_email": "jane@acme.com",
"contact_title": None, "note": "met at conf"}
def test_store_put_get_pop():
s = proposals.ProposalStore()
assert not s.has("$root")
s.put("$root", SAMPLE)
assert s.has("$root")
assert s.get("$root")["investor_name"] == "Acme Capital"
assert s.pop("$root")["investor_name"] == "Acme Capital"
assert not s.has("$root")
assert s.pop("$missing") is None
def test_interpret_yes_variants():
for t in ("yes", "Y", "approve", " ok ", "👍"):
assert proposals.interpret_reply(t)[0] == "approve", t
def test_interpret_no_variants():
for t in ("no", "N", "cancel", "discard", ""):
assert proposals.interpret_reply(t)[0] == "reject", t
def test_interpret_edit_equals():
action, payload = proposals.interpret_reply("edit email=new@acme.com")
assert action == "edit"
assert payload == ("contact_email", "new@acme.com")
def test_interpret_edit_colon_and_alias():
action, payload = proposals.interpret_reply("firm: Acme Capital LLC")
assert action == "edit"
assert payload == ("investor_name", "Acme Capital LLC")
def test_interpret_unknown():
assert proposals.interpret_reply("maybe later")[0] == "unknown"
def test_interpret_edit_colon_value_contains_equals():
# the '=' inside the value must not break parsing — split on ':' first, keep the rest
action, payload = proposals.interpret_reply("note: see deck=v2")
assert action == "edit"
assert payload == ("note", "see deck=v2")
def test_claim_once_pop_guards_double_approve():
# the double-approve guard relies on pop() yielding the proposal exactly once;
# a second claim returns None so a racing second 'yes' is a no-op
s = proposals.ProposalStore()
s.put("$r", SAMPLE)
assert s.pop("$r") is not None
assert s.pop("$r") is None
def test_edit_with_unknown_field_is_not_an_edit():
# an unknown field name must not silently become an edit
assert proposals.interpret_reply("edit zipcode=90210")[0] == "unknown"
def test_apply_edit_is_nondestructive():
updated = proposals.apply_edit(SAMPLE, "contact_email", "x@y.com")
assert updated["contact_email"] == "x@y.com"
assert SAMPLE["contact_email"] == "jane@acme.com" # original untouched
def test_render_includes_fields_and_instructions():
text = proposals.render(SAMPLE)
assert "Acme Capital" in text
assert "jane@acme.com" in text
assert "yes" in text.lower() and "no" in text.lower()
def test_render_meeting_note_variant():
note = dict(SAMPLE, intent="meeting_note")
assert "meeting note" in proposals.render(note).lower()
if __name__ == "__main__":
fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
for fn in fns:
fn()
print(f"ok {fn.__name__}")
print(f"\n{len(fns)} passed")
+62 -1
View File
@@ -1215,6 +1215,45 @@ def sync_fundraising_relational(conn, grid, views, actor_user_id=None):
))
run_fundraising_automations(conn)
def find_intake_match(conn, q, email=None):
"""Find an existing fundraising-grid investor for the intake bot's new-vs-existing hint.
Scans the canonical grid blob (not the derived tables) so the returned `id` is the grid
row id that handle_log_fundraising_communication matches on — keeping the bot's proposal
consistent with where the write actually lands (no duplicate-investor risk). Matches by
normalized investor_name first (the write's own key), then falls back to a contact email.
Deleted investors are absent from the blob; graveyarded ones remain (a note on them is
still valid), so no extra filtering is needed."""
row = conn.execute("SELECT grid_json FROM fundraising_state WHERE id = 'main'").fetchone()
if not row or not row['grid_json']:
return None
try:
grid = json.loads(row['grid_json'])
except Exception:
return None
rows = grid.get('rows', []) if isinstance(grid, dict) else []
wanted_name = _normalize_text(q) if q else ''
wanted_email = (email or '').strip().lower()
email_hit = None
for r in rows:
if not isinstance(r, dict):
continue
rid = str(r.get('id') or '').strip()
if not rid:
continue
name = str(r.get('investor_name') or '').strip()
if wanted_name and _normalize_text(name) == wanted_name:
return {"id": rid, "investor_name": name, "matched_on": "name"}
if wanted_email and email_hit is None:
contacts = r.get('contacts')
if isinstance(contacts, list):
for c in contacts:
if isinstance(c, dict) and str(c.get('email') or '').strip().lower() == wanted_email:
email_hit = {"id": rid, "investor_name": name, "matched_on": "email"}
break
return email_hit
def ensure_fundraising_state_row(conn):
existing = conn.execute("SELECT * FROM fundraising_state WHERE id = 'main'").fetchone()
if not existing:
@@ -1829,6 +1868,10 @@ class CRMHandler(BaseHTTPRequestHandler):
if path == '/api/outreach/radar':
return self.handle_outreach_radar(user)
# Matrix intake bot — new-vs-existing lookup for its in-thread proposal
if path == '/api/intake/match':
return self.handle_intake_match(user, params)
# Users
if path == '/api/users':
return self.handle_list_users(user)
@@ -2747,6 +2790,9 @@ class CRMHandler(BaseHTTPRequestHandler):
contact_in = body.get('contact')
append_note = bool(body.get('append_note', True))
create_investor_if_missing = bool(body.get('create_investor_if_missing', False))
# Provenance: where this logged communication originated (grid UI vs the Matrix
# intake bot). Default preserves prior behavior; callers may override.
comm_source = (str(body.get('source') or 'fundraising_grid').strip() or 'fundraising_grid')[:64]
if not row_id and not investor_name_in:
return self.send_error_json("row_id or investor_name is required")
@@ -2863,7 +2909,7 @@ class CRMHandler(BaseHTTPRequestHandler):
user['user_id']
))
conn.execute("UPDATE contacts SET updated_at = ? WHERE id = ?", (now(), contact_id))
log_audit(conn, user['user_id'], 'communication', comm_id, 'create', {"source": "fundraising_grid"})
log_audit(conn, user['user_id'], 'communication', comm_id, 'create', {"source": comm_source})
iso_day = now()[:10]
target_row['last_communication_date'] = iso_day
@@ -2900,6 +2946,21 @@ class CRMHandler(BaseHTTPRequestHandler):
conn.close()
return self.send_json({"data": {"communication": comm, "row": target_row, "version": next_version}}, 201)
def handle_intake_match(self, user, params):
"""Read-only: does an investor matching this intake already exist? Used by the
Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the
grid row id so an approved note lands on exactly that investor."""
q = str(params.get('q') or '').strip()
email = str(params.get('email') or '').strip()
if not q and not email:
return self.send_error_json("q or email is required")
conn = get_db()
try:
match = find_intake_match(conn, q, email)
finally:
conn.close()
return self.send_json({"data": {"match": match}})
def handle_update_communication(self, user, comm_id, body):
conn = get_db()
existing = conn.execute("SELECT id FROM communications WHERE id = ?", (comm_id,)).fetchone()
+165
View File
@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""Tests for the Matrix-intake CRM surface (v0.1.0 Matrix-intake M2).
The bot adds no parallel write path — it reuses /api/fundraising/log-communication and adds
one read-only lookup, GET /api/intake/match. This boots the REAL server against a temp DB and
asserts:
- match by normalized name and by contact email, returning the GRID ROW id;
- the new-vs-existing contract: a bot-style create (log-communication +
create_investor_if_missing) then matches by name — so an approved note lands on that same
investor instead of duplicating it;
- provenance: an intake-sourced communication is audited with source="matrix_intake";
- guards: missing q/email -> 400, unauthenticated -> 401.
Synthetic data only.
Run: cd backend && python3 test_intake_endpoints.py
"""
import http.client
import json
import os
import sqlite3
import sys
import tempfile
import threading
from http.server import ThreadingHTTPServer
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import server # noqa: E402
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
class _Quiet(server.CRMHandler):
def log_message(self, *a):
pass
def _req(port, method, path, token=None, body=None):
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10)
headers = {}
if token:
headers["Authorization"] = "Bearer " + token
payload = None
if body is not None:
payload = json.dumps(body)
headers["Content-Type"] = "application/json"
conn.request(method, path, body=payload, headers=headers)
resp = conn.getresponse()
raw = resp.read().decode("utf-8", "replace")
conn.close()
data = None
if raw:
try:
data = json.loads(raw)
except ValueError:
pass
return resp.status, data
GRID = {
"columns": [],
"rows": [
{"id": "rowAcme", "investor_name": "Acme Capital", "notes": "",
"contacts": [{"name": "Jane Doe", "email": "jane@acme.com", "title": "GP"}]},
],
}
def seed():
c = sqlite3.connect(os.environ["CRM_DB_PATH"])
c.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) "
"VALUES ('u1','grant','grant@ten31.example','x','Grant','admin',1)")
# init_db doesn't create the 'main' state row (it's created lazily on first write), so
# upsert rather than UPDATE — a plain UPDATE would silently match zero rows.
c.execute("INSERT INTO fundraising_state (id, grid_json, views_json, version) "
"VALUES ('main', ?, '[]', 1) "
"ON CONFLICT(id) DO UPDATE SET grid_json = excluded.grid_json", (json.dumps(GRID),))
c.commit()
c.close()
def main():
server.init_db()
seed()
token = server.create_token("u1", "grant", "admin")
httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet)
port = httpd.server_address[1]
threading.Thread(target=httpd.serve_forever, daemon=True).start()
try:
print("\n[match: existing investor by name returns the grid row id]")
st, d = _req(port, "GET", "/api/intake/match?q=Acme%20Capital", token)
m = (d or {}).get("data", {}).get("match")
check(st == 200 and m and m["id"] == "rowAcme" and m["matched_on"] == "name",
f"name match -> rowAcme (got {st}, {m})")
print("\n[match: case-insensitive name]")
st, d = _req(port, "GET", "/api/intake/match?q=acme%20capital", token)
m = (d or {}).get("data", {}).get("match")
check(m and m["id"] == "rowAcme", f"normalized name match (got {m})")
print("\n[match: by contact email]")
st, d = _req(port, "GET", "/api/intake/match?email=jane@acme.com", token)
m = (d or {}).get("data", {}).get("match")
check(m and m["id"] == "rowAcme" and m["matched_on"] == "email",
f"email match -> rowAcme (got {m})")
print("\n[match: unknown -> null]")
st, d = _req(port, "GET", "/api/intake/match?q=Nobody%20LP", token)
check(st == 200 and (d or {}).get("data", {}).get("match") is None,
f"no match -> null (got {st}, {d})")
print("\n[match: missing q and email -> 400]")
st, _ = _req(port, "GET", "/api/intake/match", token)
check(st == 400, f"no params -> 400 (got {st})")
print("\n[match: unauthenticated -> 401]")
st, _ = _req(port, "GET", "/api/intake/match?q=Acme", None)
check(st == 401, f"no token -> 401 (got {st})")
print("\n[bot create: log-communication + create_investor_if_missing, source tagged]")
st, d = _req(port, "POST", "/api/fundraising/log-communication", token, {
"investor_name": "Beacon Ventures",
"contact": {"name": "Sam Lee", "email": "sam@beacon.vc", "title": "Partner"},
"create_investor_if_missing": True,
"type": "note", "subject": "Intake (Matrix)", "body": "met at the Austin conf",
"source": "matrix_intake",
})
check(st in (200, 201), f"create new investor -> 201 (got {st})")
print("\n[new-vs-existing contract: the just-created investor now matches by name]")
st, d = _req(port, "GET", "/api/intake/match?q=Beacon%20Ventures", token)
m = (d or {}).get("data", {}).get("match")
check(m and m.get("investor_name") == "Beacon Ventures",
f"created investor is matchable (no duplicate on next note) (got {m})")
print("\n[provenance: the intake communication is audited as source=matrix_intake]")
c = sqlite3.connect(os.environ["CRM_DB_PATH"])
rows = c.execute("SELECT changes FROM audit_log WHERE entity_type='communication' AND action='create'").fetchall()
c.close()
sources = [json.loads(r[0]).get("source") for r in rows if r[0]]
check("matrix_intake" in sources, f"audit carries source=matrix_intake (got {sources})")
finally:
httpd.shutdown()
print()
if FAILS:
print(f"FAILED ({len(FAILS)}):")
for f in FAILS:
print(f" - {f}")
sys.exit(1)
print("ALL PASS (matrix-intake endpoints)")
if __name__ == "__main__":
main()