From 7ad0ee762470e8f05911fffb8c7a97adcc874b18 Mon Sep 17 00:00:00 2001 From: Keysat Date: Wed, 17 Jun 2026 07:51:27 -0500 Subject: [PATCH] =?UTF-8?q?Add=20Matrix=20intake=20bot=20(M1+M2):=20typed?= =?UTF-8?q?=20message=20=E2=86=92=20approved=20fundraising-grid=20write?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New backend/matrix_intake/ runs as its own process (matrix-nio isolated from the stdlib CRM): local-Qwen parse via Spark Control → in-thread human approval (yes/edit/no) → write through the CRM's own log-communication endpoint, tagged source=matrix_intake. Adds read-only GET /api/intake/match (returns grid row id, no-duplicate contract); threads provenance through handle_log_fundraising_communication. Reviewer-passed: pop-before-commit closes a double-approve race; edit-grammar fix. Text-only v1; business-card photo (M3) deferred (no Spark vision model). 26/26 tests green; live Matrix smoke pending deploy. --- .claude/rules/matrix-intake.md | 1 + .env.example | 17 +++ AGENTS.md | 7 +- ROADMAP.md | 15 ++- backend/matrix_intake/README.md | 41 ++++++ backend/matrix_intake/__init__.py | 7 + backend/matrix_intake/bot.py | 121 +++++++++++++++++ backend/matrix_intake/crm_client.py | 127 +++++++++++++++++ backend/matrix_intake/matrix_io.py | 55 ++++++++ backend/matrix_intake/parse.py | 63 +++++++++ backend/matrix_intake/proposals.py | 103 ++++++++++++++ backend/matrix_intake/requirements.txt | 4 + backend/matrix_intake/settings.py | 56 ++++++++ backend/matrix_intake/spark.py | 21 +++ backend/matrix_intake/test_crm_client.py | 54 ++++++++ backend/matrix_intake/test_parse.py | 93 +++++++++++++ backend/matrix_intake/test_proposals.py | 95 +++++++++++++ backend/server.py | 63 ++++++++- backend/test_intake_endpoints.py | 165 +++++++++++++++++++++++ docs/guides/matrix-intake.md | 68 ++++++++++ 20 files changed, 1169 insertions(+), 7 deletions(-) create mode 120000 .claude/rules/matrix-intake.md create mode 100644 backend/matrix_intake/README.md create mode 100644 backend/matrix_intake/__init__.py create mode 100644 backend/matrix_intake/bot.py create mode 100644 backend/matrix_intake/crm_client.py create mode 100644 backend/matrix_intake/matrix_io.py create mode 100644 backend/matrix_intake/parse.py create mode 100644 backend/matrix_intake/proposals.py create mode 100644 backend/matrix_intake/requirements.txt create mode 100644 backend/matrix_intake/settings.py create mode 100644 backend/matrix_intake/spark.py create mode 100644 backend/matrix_intake/test_crm_client.py create mode 100644 backend/matrix_intake/test_parse.py create mode 100644 backend/matrix_intake/test_proposals.py create mode 100644 backend/test_intake_endpoints.py create mode 100644 docs/guides/matrix-intake.md diff --git a/.claude/rules/matrix-intake.md b/.claude/rules/matrix-intake.md new file mode 120000 index 0000000..634d080 --- /dev/null +++ b/.claude/rules/matrix-intake.md @@ -0,0 +1 @@ +../../docs/guides/matrix-intake.md \ No newline at end of file diff --git a/.env.example b/.env.example index 28b4684..da159a0 100644 --- a/.env.example +++ b/.env.example @@ -47,3 +47,20 @@ SMTP_SECURITY=starttls SMTP_FROM= SMTP_USERNAME= SMTP_PASSWORD= + +# ── Matrix intake bot (backend/matrix_intake/, runs as its own process on the Spark) ── +# Parses a typed message in a dedicated Matrix room into a proposed fundraising-grid +# add/edit (local Qwen via Spark Control above), then writes through the CRM API only +# after in-thread human approval. Reuses SPARK_CONTROL_URL / CRM_CHAT_MODEL above. +MATRIX_HOMESERVER=https:// +MATRIX_USER=@intake-bot: +MATRIX_ACCESS_TOKEN= +MATRIX_DEVICE_ID=ten31-intake-bot +MATRIX_INTAKE_ROOM=!: +# CRM write-back: the bot logs in as a DEDICATED service user (admin-created CRM user; +# the CRM has no service-key path, so it uses normal Bearer-JWT auth). +CRM_API_BASE=http://127.0.0.1:8080 +CRM_BOT_USERNAME= +CRM_BOT_PASSWORD= +# Set to false only if CRM_API_BASE is https with a self-signed cert. +CRM_API_VERIFY_TLS=true diff --git a/AGENTS.md b/AGENTS.md index 3847971..1940b68 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -48,6 +48,7 @@ cd start9/0.4 && make - `backend/redaction/` — `scrub.py` + `client.py`: the scrub→Claude→re-hydrate privacy boundary. - `backend/ingest/` — chunk→embed→Qdrant + retrieval modes. - `backend/entity_*.py` — entity resolution/merge (the two-investor-model reconciliation). +- `backend/matrix_intake/` — Matrix intake bot (separate process; `matrix-nio`, isolated to this component): typed message → local-Qwen parse → in-thread approve → write via the CRM's own `log-communication`. See the matrix-intake guide. - `frontend/index.html` — the entire UI. - `docs/` — architecture, phase plans, contracts, runbooks (see Deeper docs). `docs/guides/` — scoped subsystem rules (see below). - `start9/0.4/` — StartOS package (`startos/utils.ts` holds `PACKAGE_VERSION`). @@ -63,6 +64,7 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude - **Ingest / retrieval** (`backend/ingest/`) → `docs/guides/spark-ingest.md` - **Email capture / drafts + digest send** (`backend/email_integration/`, `backend/digest_mailer.py`, `backend/smtp_send.py`) → `docs/guides/email.md` - **Building or deploying the s9pk** (`start9/`) → `docs/guides/packaging.md` +- **Matrix intake bot** (`backend/matrix_intake/`) → `docs/guides/matrix-intake.md` ## Conventions @@ -103,13 +105,14 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude _Phase 0 substrate + Phase 1 thesis/outreach are built; **box and repo at v0.1.0:83** (deployed & verified live 2026-06-16). v83 (latest): **email search/query + windowed digest preview** — Communications tab gains a fixed/typed investor dropdown, a date-range filter, a full-body view, and a semantic "Search content" mode; the Daily Digest gains an in-app windowed preview before send. Prior v82: front-end libs vendored + SRI-pinned + jsdom render-smoke build gate. **Decision (2026-06-16): the fundraising grid + email capture is the canonical system of record** — vestigial classic-CRM surfaces get pruned or repurposed (see `ROADMAP.md` → "Consolidate on the fundraising grid as canonical"). Longer-term backlog: `ROADMAP.md`._ +- **Built & reviewed, not yet deployed — Matrix intake bot (M1+M2), `backend/matrix_intake/`:** a separate-process bot (its `matrix-nio` dep isolated from the stdlib CRM) that turns a typed message in a dedicated Matrix room into a proposed fundraising-grid add/edit and writes only after **in-thread human approval** (`yes`/`edit field=value`/`no`). Parse = local Qwen via Spark Control (reuses `ingest/llm.py`; no Claude, no scrub needed — local path like the digest). Writes reuse the CRM's own `POST /api/fundraising/log-communication` (create-if-missing + contact upsert + note + relational sync + audit), tagged `source="matrix_intake"`; the one new CRM surface is read-only `GET /api/intake/match` (`find_intake_match`) returning the **grid row id** so an approved note lands on the matched investor (no duplicate). v1 is **text-only** — business-card photo (M3) is deferred (Spark Control has no vision model). Reviewer-passed (double-approve race fixed — `handle_reply` pops before the commit await; edit-grammar fix). **Code-complete, compiles, 26/26 tests green; a live Matrix smoke needs creds + `matrix-nio` on the Spark (can't run in CI).** Guide: `docs/guides/matrix-intake.md` (incl. the `settings.py`-not-`config.py` collision + email-integrity gotchas). - **Working (all draft-only):** CRM + ingest (chunk→embed→Qdrant + retrieval) + redaction boundary; Gmail capture (DWD) + email-activity propose→approve; Thesis Workshop + Architect (Claude) with dual-approval gate; Outreach Draft Assistant + follow-up radar + per-user voice + Tier-B in-thread Gmail draft creation. - **Deployed & verified live: v0.1.0:83** (box `$START9_BOX_HOST`/immense-voyage.local; `installed-version`→`0.1.0:83`, migration chain `…82→83` clean, server up on `:8080`, Gmail + ingest + digest schedulers all started; render-smoke gated the build) — **email search/query + windowed digest preview** (code-only, migrations no-op). Communications tab (`CommunicationsPage` + `email_integration/db.query_email_activity`): **fixed the investor dropdown** — the facet now mirrors the list with the digest's precedence (grid → org → contact → address) and **typed keys** (`fund:`/`org:`/`contact:`), so email matched only to a classic contact or org domain (no grid id — the common case, since `fundraising_contacts.email` is sparsely populated) now resolves to a real name and is selectable, instead of the dropdown being empty; added a **date-range filter** (`since`/`until`), and a **click-to-expand full-body view** (`GET /api/email/detail?id=` → `query_email_detail`, admin, soft-delete-gated, renders `body_text` escaped — never raw HTML). New **semantic content search**: a "Search content" toggle → `GET /api/email/search?q=` (`routes._h_search`) wrapping `ingest/search.py:hybrid_search` filtered to `doc_type='email'` (lazy import; **503** if Spark/Qdrant unreachable), **hydrated + soft-delete-filtered against SQLite** (`db.search_hit_emails` — never trust the derived index). **Daily Digest:** Settings → Admin now builds a digest over a chosen window (last 24h or since a date) as an **in-app preview** before sending (`POST /api/admin/digest/preview`); manual send uses the same window (`send-now` + `digest_scheduler.send_digest_window`); window resolved by `digest_builder.resolve_digest_window` (cap 92d). Both run the **real local-Spark summarizer** and **never touch the daily cursor**. Verified: 22/22 backend tests, `py_compile` clean, render-smoke pass. **Grant validated both live on the box 2026-06-16** — the digest windowed preview renders real Spark narratives over real activity, and the Communications dropdown / date filter / full-body view / content-search all work. Detail: `docs/guides/email.md`. - **Deployed & verified live: v0.1.0:82** (box `$START9_BOX_HOST`/immense-voyage.local; `installed-version`→`0.1.0:82`, migration chain `…81→82` clean, server up on `:8080`, schedulers + Gmail integration up). **v82 vendored React 18.3.1 / ReactDOM 18.3.1 / @babel/standalone 7.29.7 into `frontend/assets/vendor/`**, served same-origin with `sha384` SRI (no CDN, no outbound-internet dependency to render the UI), and added **`start9/0.4/render-smoke.mjs`** — a jsdom check (shipped-Babel transform asserts classic/non-module + parseable; real mount asserts the login UI renders) wired into the default `make` goal (`verified-build`), so every build is gated on the frontend actually rendering. Closes the v78 (blank screen) + v79 (Babel-8 ESM-import) class structurally. Detail: `docs/guides/packaging.md`. **Prior shipped & live:** v81 Communications-tab matched-only (`query_email_activity` gates on `EXISTS(email_investor_links)`; unmatched email captured but never shown; `docs/guides/email.md`); v80 admin-only email-activity panel (`GET /api/email/activity`); v78 retired `lp_profiles`/LP Tracker + repointed Dashboard "Total Committed" onto the grid (graveyard-excluded). **Digest fully live:** capture (DWD) → propose→approve; Gmail-DWD→SMTP transport; daily Phase-B digest (`digest_builder.py` + always-on `digest_scheduler.py` reading a DB policy + `send-now`); **daily auto-send is now ENABLED** (Grant turned it on in Settings → Admin, 2026-06-16). Detail: `docs/guides/email.md`. - **Live since v74 (2026-06-13):** login works; `/assets/` traversal 404s (plain + URL-encoded), root health 200. On boot, `ensure_thesis_v2_promoted` makes the v2.0 reserve-asset spine the working *approved* spine (node-level, reversible). Security/privacy hardening (path-traversal close, outreach NER backstop, get-by-id soft-delete) shipped in v74 — detail in `EVALUATION.md`. -- **Tests (2026-06-16):** **22/22 backend tests green** via `python3 backend/run_tests.py`, `py_compile` clean. `test_email_activity_panel.py` now covers the **typed facet + org/contact resolution** (the dropdown fix), the **date-range filter**, the **detail view** (full body / recipients / attachments / soft-delete), and the **content-search route** (hydrate / drop-tombstoned / 503 / admin) with retrieval stubbed; `test_digest_builder.py` adds the **window resolver** + **`send_digest_window`** (no-cursor-touch) cases. Frontend **render smoke check** (`cd start9/0.4 && make render-smoke`) still gates the default `make` build. The 2 stale thesis tests stay fixed (seed structure in `docs/guides/thesis.md`). +- **Tests (2026-06-16):** **26/26 backend tests green** via `python3 backend/run_tests.py`, `py_compile` clean. (+4 this session for the Matrix intake bot: `matrix_intake/test_parse.py`, `test_proposals.py`, `test_crm_client.py`, and `test_intake_endpoints.py` — the last boots the real server against a temp DB and covers `/api/intake/match`, the create→match no-duplicate contract, and `source="matrix_intake"` provenance.) `test_email_activity_panel.py` now covers the **typed facet + org/contact resolution** (the dropdown fix), the **date-range filter**, the **detail view** (full body / recipients / attachments / soft-delete), and the **content-search route** (hydrate / drop-tombstoned / 503 / admin) with retrieval stubbed; `test_digest_builder.py` adds the **window resolver** + **`send_digest_window`** (no-cursor-touch) cases. Frontend **render smoke check** (`cd start9/0.4 && make render-smoke`) still gates the default `make` build. The 2 stale thesis tests stay fixed (seed structure in `docs/guides/thesis.md`). - **Decided, not yet built (detail in `ROADMAP.md`):** Pipeline adoption + a grid flag that auto-loads flagged investors as opportunities; **NL→safe-query** feature (search item 3 — the larger, separate build); CRM as canonical thesis backbone with the signal-engine reading from it (reconciliation unwired); reply-all for Tier-B drafts (currently reply to the LP only). *(Done this session, v83: email search item 1 [activity query/panel gaps — typed facet fix + date range + full-body view] and item 2 [semantic content search] both shipped; daily-digest windowed preview→send.)* - **Known debt (P2, not deploy-blocking):** **reports-subsystem soft-delete sweep** — `handle_pipeline_report` + remaining report/aggregate queries over opportunities/communications still count soft-deleted rows (v78 shrank this surface: the `lp_profiles`/lp-breakdown aggregates are gone and the dashboard "Total Committed" is now grid-sourced); needs a pass + report-endpoint tests. Also `?limit=abc` crashes the request thread (authenticated list path); scrub-gateway TLS verify off; `cryptography==42.0.5`; stale user-visible `start9/0.4/assets/ABOUT.md`; hardcoded Spark/Qdrant IPs in the s9pk; **StartOS package icon oversized/zoomed** (research the Start9 icon spec, source a base ten31 logo, produce a correctly sized icon **before the next s9pk upload**); the 5.4k-line `server.py` monolith. P3 batch + full list in `EVALUATION.md`. *(Resolved v82: front-end CDN/SRI risk — libs vendored + SRI-pinned — and the render smoke check is now scripted into the build.)* - **Doc drift to reconcile:** `crm-overview.md` + `EVALUATION.md` still describe `lp_profiles` as a live model in places — a doc-auditor pass should align them to "grid canonical, `lp_profiles` retired." - **Other gaps:** the v2.0 spine is the *working* spine but **not a canonical `thesis_version`** (needs Grant + Jonathan dual sign-off); Appendix-A conviction/exposure (incl. ~40% Strike) stay Grant's working read, not canonical, not fed to the engine. Live infra now exercised on the box (Gmail capture + schedulers up; local-Spark summarization confirmed via the digest preview; Qdrant via Communications content-search); **Claude/Architect path still unverified live on the box.** -- **Next:** 1) add an **auth regression test** asserting the 3 v79-gated GET endpoints (`/api/users`, `/api/email/status`, `/api/email/accounts`) reject members; 2) **reports-subsystem soft-delete sweep** + report-endpoint tests; 3) **Pipeline adoption** — grid flag → auto-load opportunities; 4) `?limit=abc` crash; 5) **NL→safe-query** (search item 3 — separate, larger); 6) Grant + Jonathan freeze v2.0 canonical; 7) build reply-all for Tier-B drafts; 8) **email-capture tab shows an error on email sync status** — chase down the failing call (likely `/api/email/status`) and fix. *(Logged to ROADMAP: a build step that pre-compiles JSX to drop runtime Babel entirely — bigger, contradicts the "no build step" convention.)* +- **Next:** 1) **deploy + live-smoke the Matrix intake bot** (`pip install matrix-nio` + `MATRIX_*`/`CRM_BOT_*` in `.env` on the Spark, create the CRM bot user, `python3 backend/matrix_intake/bot.py`, post a test message); 2) **Pipeline adoption** — grid flag → auto-load opportunities (the agreed next major build); 3) add an **auth regression test** asserting the 3 v79-gated GET endpoints (`/api/users`, `/api/email/status`, `/api/email/accounts`) reject members; 4) **reports-subsystem soft-delete sweep** + report-endpoint tests; 5) `?limit=abc` crash; 6) **email-capture tab error on email sync status** (likely `/api/email/status`); 7) **NL→safe-query** (search item 3 — separate, larger); 8) Grant + Jonathan freeze v2.0 canonical; 9) reply-all for Tier-B drafts. *(Logged to ROADMAP: a build step that pre-compiles JSX to drop runtime Babel entirely — bigger, contradicts the "no build step" convention.)* diff --git a/ROADMAP.md b/ROADMAP.md index 43532a4..2ca0ce1 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -86,13 +86,20 @@ ## Backlog (post-Phase-1 agentic) -### Matrix-bridge intake for the fundraising grid — NEXT (high priority, after current work) -*Requested 2026-06-16. Flagged high priority: this is the next build once current work lands.* +### Matrix-bridge intake for the fundraising grid — M1+M2 BUILT (deploy + live smoke pending) +*Requested 2026-06-16. **M1 (scaffold + parse + in-thread propose) and M2 (match + write-on-approve) built, tested (26/26), not yet deployed** — code in `backend/matrix_intake/`, guide at `docs/guides/matrix-intake.md`. Remaining: install `matrix-nio` + creds on the Spark, create the CRM bot user, and a **live Matrix smoke** (can't run in CI). M3 (business-card photo) deferred until Spark Control has a vision model. Next major build after this is **Pipeline adoption** (see below).* Use the **matrix-bridge** repo's pattern to listen on a dedicated ten31-database Matrix room. Send a message (with an optional business-card photo) and a local LLM **via Spark Control** parses it into the fundraising-grid schema and **auto-creates the investor entity + contact row**. For an existing investor, send a meeting note and it **appends an interaction-log entry**. Approval gate: the bot replies in Matrix with the proposed add/edit; the user approves / rejects / edits in-thread before the write commits (keeps the draft→human-approve guardrail). -- **First step:** have the **explorer agent review the `matrix-bridge` repo** to surface the reusable listener/bridge pattern before designing the intake. - Fits the "grid is canonical" direction (writes land in `fundraising_*`) and the never-send-autonomously rule (in-thread human approval before any write). +**Locked design (2026-06-16, approved) — build now, M1 then M2:** +- **Separate component, shared scaffold:** new `backend/matrix_intake/` (its own process; lifts matrix-bridge's connect/prime-then-listen/threaded-reply plumbing). `matrix-nio` is isolated to this component's `requirements.txt` — it never enters the stdlib CRM runtime. Keeps the CRM write credential + LP data out of the general-purpose matrix-bridge bot (blast-radius + data-sovereignty), and lets the two iterate independently. Runs on the Spark (placement settled against `standards/guides/placement.md` at deploy). +- **v1 = text-only.** Business-card photo deferred to M3 — Spark Control fronts chat/embeddings/rerank but **no vision model** today, so photo→fields isn't buildable end-to-end yet. +- **Parse:** local Qwen via Spark Control `/v1/chat/completions` (temp 0, JSON-only), reusing the existing Spark client pattern (`backend/redaction`/`backend/ingest`). +- **Approval handshake (the one stateful piece):** in-memory pending-proposal store keyed by Matrix thread root; user replies **yes / edit field=value / no** in-thread. Satisfies never-write-autonomously; exempt from "agents draft, humans send" (internal data entry, like the digest). +- **CRM-side:** `POST /api/intake/investor` (service-auth) creates a new investor+contact **through the existing grid-save path** (so relational sync + audit + backup-on-write happen as with a UI edit; bot never does whole-blob RMW) or appends a meeting note to the interaction log for an existing investor; `GET /api/intake/match?q=` fuzzy-matches via the existing entity-resolution/email-matcher. New investor needs no fund at intake. +- **Phases:** M1 = scaffold + parse + in-thread propose, **no writes** (proves Matrix↔Spark). M2 = intake endpoint + match + write-on-approve + tests. M3 (deferred) = business-card photo. + ### Admin-only vs. all-users web-UI surface — audit *Requested 2026-06-16 (idea, P2).* Have the **explorer agent** report which web-UI functionality is visible only to admins vs. to all users (member role) — a map of the role-gated surface across `frontend/index.html` and the backend route auth checks. Useful input for the consolidation/permissions work. @@ -145,7 +152,7 @@ Open design questions (settled at build time): send time = **6 PM box-local** (c - **Dashboard KPIs repointed:** "Total Committed" now sums `fundraising_investors.total_invested` (the canonical grid rollup), **excluding graveyarded investors** so the headline reflects live committed capital — a deliberate divergence from `/api/fundraising/relational-summary`, which sums all rows. "Total Funded" dropped — the grid has no funded-vs-committed concept and the frontend never rendered it. (If a funded/wired status is wanted later, that's a new grid feature, not a revival of lp_profiles.) Regression-guarded by `test_dashboard_report.py`. - **Left in place (intentional):** the empty `lp_profiles` table + index (no destructive drop, per never-hard-delete); the contact-delete soft-delete cascade; the `--reset-all-data` clear; and the inert MOCK_MODE `mockDb.lp_profiles` fixtures (dev-only fallback, never hits the backend — its dashboard mock still reads mock lp_profiles, a known dev-only divergence from the real backend). Updated `test_soft_delete_reads.py` to drop the now-removed `lp_profile` assertions (kept its org `total_funded` opportunities-aggregate checks). -**Adopt the Pipeline — wire it to the grid.** +**Adopt the Pipeline — wire it to the grid.** *(Priority: second build, after the Matrix-bridge intake — confirmed 2026-06-16.)* - Pipeline (`opportunities`) is fully built and functional but unused. Keep it: it's the one classic surface that tracks something the grid doesn't — a forward-looking deal funnel (stage, `expected_amount × probability`, owner, close date) vs. the grid's actual committed dollars + flags. - New idea (Grant, 2026-06-16): let users **flag an investor in the grid as a pipeline opportunity** (a grid column/control) so it **auto-creates / syncs an `opportunities` row** that loads into the Pipeline board. Design the grid↔pipeline link (which fund seeds it? what sets stage/expected amount? keep them reconciled). Turns Pipeline from a disconnected second data-entry surface into a view driven by the canonical grid. - Revisit the stray contact-create side-door (the "Create Opportunity" modal `POST /api/contacts`, `frontend/index.html:6030`) once the grid-driven flow exists. diff --git a/backend/matrix_intake/README.md b/backend/matrix_intake/README.md new file mode 100644 index 0000000..dd25d26 --- /dev/null +++ b/backend/matrix_intake/README.md @@ -0,0 +1,41 @@ +# Matrix intake bot + +Turns a typed message in a dedicated Matrix room into a proposed fundraising-grid add/edit, +gated on in-thread human approval before any write. Runs as its own process (on the Spark), +separate from the CRM. Full design + rules: `docs/guides/matrix-intake.md`. + +## Run + +```bash +# 1. Install the one third-party dep (isolated to this component — NOT the CRM runtime) +python3 -m pip install -r requirements.txt # matrix-nio + +# 2. Fill the MATRIX_* and CRM_BOT_* vars in the repo .env (see ../../.env.example), +# and create a dedicated CRM user for CRM_BOT_USERNAME/PASSWORD (admin → invite user). + +# 3. Start the listener +python3 bot.py +``` + +It primes the Matrix sync past history (no backlog replay), then listens. Post a message in +the intake room; it replies in a thread with the parsed proposal. Reply **yes** to commit, +**edit field=value** to change a field, or **no** to discard. + +## Layout + +- `bot.py` — entrypoint: connect, prime-then-listen, dispatch (lifts matrix-bridge's plumbing). +- `parse.py` — message → structured proposal via local Qwen (`spark.py` → `backend/ingest/llm.py`). +- `proposals.py` — in-memory pending-proposal store + the yes/edit/no state machine. +- `crm_client.py` — login + `GET /api/intake/match` + write via `POST /api/fundraising/log-communication`. +- `matrix_io.py` — message splitting, thread-root detection, threaded-reply sender. +- `settings.py` — Matrix + CRM-API config (named `settings`, not `config`, to avoid shadowing `ingest/config`). + +## Test (offline) + +```bash +python3 test_parse.py && python3 test_proposals.py && python3 test_crm_client.py +# endpoint + create→match contract (boots the real server against a temp DB): +cd ../ && python3 test_intake_endpoints.py +``` + +Live Matrix behavior needs creds + `matrix-nio` and can only be smoke-tested on the Spark. diff --git a/backend/matrix_intake/__init__.py b/backend/matrix_intake/__init__.py new file mode 100644 index 0000000..ce6dacf --- /dev/null +++ b/backend/matrix_intake/__init__.py @@ -0,0 +1,7 @@ +"""Matrix intake bot — a dedicated Matrix room that turns a typed message into a +proposed fundraising-grid add/edit, gated on in-thread human approval before any write. + +Separate process from the CRM (its only third-party dep, matrix-nio, lives here, never +in the stdlib CRM runtime). Parses with local Qwen via Spark Control; on approval, writes +through the CRM's own API. See docs/guides/matrix-intake.md and ROADMAP.md. +""" diff --git a/backend/matrix_intake/bot.py b/backend/matrix_intake/bot.py new file mode 100644 index 0000000..ed81c95 --- /dev/null +++ b/backend/matrix_intake/bot.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Matrix intake bot — entrypoint. + +A top-level message in the dedicated intake room is parsed (local Qwen via Spark Control) +into a proposed fundraising-grid add/edit and posted back IN A THREAD. The team member +replies in that thread — **yes** / **edit field=value** / **no** — and only on **yes** does +the bot write, through the CRM's own API. Nothing is ever written autonomously. + +Runs as its own process (its matrix-nio dep is isolated here, never in the CRM runtime). +Lifts matrix-bridge's prime-then-listen + threaded-reply plumbing. Config: repo .env. +""" +import asyncio + +from nio import AsyncClient, MatrixRoom, RoomMessageText + +import crm_client +import matrix_io +import parse +import proposals +import settings + +UNCLEAR_HELP = ( + "🤔 I couldn't tell what to record. Try e.g.\n" + "`New investor: Acme Capital — Jane Doe , met at the Austin conf`\n" + "or a note like `Note for Acme Capital: wants the Q3 deck, follow up next week`." +) + + +async def main(): + mx = settings.matrix_settings() + client = AsyncClient(mx["homeserver"], mx["user_id"]) + client.restore_login(user_id=mx["user_id"], device_id=mx["device_id"], access_token=mx["token"]) + say = matrix_io.make_say(client) + store = proposals.ProposalStore() + intake_room = mx["intake_room"] + + async def handle_intake(room_id, root, text): + try: + proposal = await asyncio.to_thread(parse.parse_message, text) + except Exception as exc: # Spark/Qwen unreachable or bad response + await say(room_id, f"⚠️ couldn't reach the local parser: {exc}", root) + return + if proposal["intent"] == "unclear": + await say(room_id, UNCLEAR_HELP, root) + return + # Confirm new-vs-existing against the CRM matcher (read-only). Degrade gracefully if + # the CRM is unreachable — still propose, just without the "looks like existing" hint. + hint = "" + try: + match = await asyncio.to_thread(crm_client.match, proposal) + if match: + proposal["intent"] = "meeting_note" + proposal["_match_id"] = match["id"] + hint = f"\n\n🔎 Looks like an existing investor: **{match['name']}** — this will append a note to them." + except Exception: + pass + store.put(root, proposal) + await say(room_id, proposals.render(proposal) + hint, root) + + async def handle_reply(room_id, root, text): + action, payload = proposals.interpret_reply(text) + # Claim the proposal synchronously — BEFORE any await — so a second reply that + # arrives while a commit is in flight can't double-process it. asyncio is + # cooperative: nothing else runs between here and the first await below, so the + # pop is atomic w.r.t. other Matrix events. + proposal = store.pop(root) + if proposal is None: + return + if action == "approve": + try: + summary = await asyncio.to_thread(crm_client.commit, proposal) + except Exception as exc: + store.put(root, proposal) # commit failed — restore so the user can retry + await say(room_id, f"⚠️ write failed, nothing committed: {exc}", root) + return + await say(room_id, f"✅ {summary}", root) + elif action == "reject": + await say(room_id, "🗑️ Discarded — nothing written.", root) + elif action == "edit": + field, value = payload + proposal = proposals.apply_edit(proposal, field, value) + store.put(root, proposal) # keep it pending (edited) for the next reply + await say(room_id, "✏️ Updated:\n\n" + proposals.render(proposal), root) + else: # unrecognized reply — leave the proposal pending + store.put(root, proposal) + await say(room_id, "Reply **yes** to commit, **edit field=value**, or **no**.", root) + + async def on_message(room: MatrixRoom, event: RoomMessageText): + if event.sender == mx["user_id"]: + return # never react to our own messages (we post in-thread — this prevents loops) + if room.room_id != intake_room: + return + text = (event.body or "").strip() + if not text: + return + root = matrix_io.thread_root_of(event) + if root and store.has(root): + await handle_reply(room.room_id, root, text) + elif root: + return # threaded message not tied to a live proposal — ignore + else: + await handle_intake(room.room_id, event.event_id, text) + + # Prime the sync token past history, THEN register the callback — only react to messages + # arriving after startup (no backlog replay). (matrix-bridge pattern.) + print("matrix-intake: priming sync (skipping backlog)...", flush=True) + await client.sync(timeout=30000, full_state=False) + client.add_event_callback(on_message, RoomMessageText) + who = await client.whoami() + print(f"matrix-intake: listening as {who.user_id} in room {intake_room}", flush=True) + try: + await client.sync_forever(timeout=30000) + finally: + await client.close() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/backend/matrix_intake/crm_client.py b/backend/matrix_intake/crm_client.py new file mode 100644 index 0000000..58db465 --- /dev/null +++ b/backend/matrix_intake/crm_client.py @@ -0,0 +1,127 @@ +"""CRM API client for the intake bot's write-back leg. + +The bot authenticates as a dedicated service user (Bearer JWT via /api/auth/login — the CRM +has no service-key path) and reuses the CRM's OWN canonical write endpoint +(/api/fundraising/log-communication) for both new-investor and existing-note cases, rather +than mutating the grid itself. That endpoint creates the grid row (create_investor_if_missing), +upserts the contact, logs the communication, appends a visible note, and re-syncs the +relational tables + audit — exactly as a UI grid edit would. We only tag provenance +(source="matrix_intake"). The payload builder is a pure function so it's unit-tested offline. +""" +import json +import ssl +import urllib.error +import urllib.request +from urllib.parse import urlencode + +import settings + +_token = None + + +def _http(method, path, body=None, token=None): + s = settings.crm_settings() + url = s["base"] + path + data = json.dumps(body).encode("utf-8") if body is not None else None + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + req = urllib.request.Request(url, data=data, method=method, headers=headers) + ctx = None + if url.lower().startswith("https") and not s["verify_tls"]: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + try: + with urllib.request.urlopen(req, timeout=30, context=ctx) as resp: + raw = resp.read() + return resp.status, (json.loads(raw) if raw else {}) + except urllib.error.HTTPError as exc: + raw = exc.read() + try: + payload = json.loads(raw) if raw else {} + except Exception: + payload = {"raw": raw.decode("utf-8", "replace")} + return exc.code, payload + + +def _login(): + global _token + s = settings.crm_settings() + if not s["username"] or not s["password"]: + raise RuntimeError("CRM bot creds not set (CRM_BOT_USERNAME / CRM_BOT_PASSWORD)") + status, data = _http("POST", "/api/auth/login", + {"username": s["username"], "password": s["password"]}) + if status != 200 or not data.get("token"): + raise RuntimeError(f"CRM login failed ({status}): {data.get('error') or data}") + _token = data["token"] + return _token + + +def _authed(method, path, body=None): + """Call the CRM with the cached token; re-login once on a 401 (token expiry).""" + global _token + token = _token or _login() + status, data = _http(method, path, body, token=token) + if status == 401: + token = _login() + status, data = _http(method, path, body, token=token) + return status, data + + +def match(proposal): + """Return {'id', 'name'} for an existing investor matching this proposal, else None.""" + q = proposal.get("investor_name") or proposal.get("contact_name") or "" + email = proposal.get("contact_email") or "" + if not q and not email: + return None + qs = urlencode({"q": q, "email": email}) + status, data = _authed("GET", f"/api/intake/match?{qs}") + if status != 200: + raise RuntimeError(f"intake match failed ({status}): {data.get('error') or data}") + m = (data.get("data") or {}).get("match") + if not m: + return None + return {"id": m["id"], "name": m.get("investor_name") or q} + + +def build_commit_payload(proposal): + """Pure: map a proposal to the /api/fundraising/log-communication request body. + + Existing investor (carries _match_id) → target that exact grid row. Otherwise create the + investor if missing. The note becomes the communication body; the email is only sent when + it survived parse's source-text integrity check.""" + contact = { + "name": proposal.get("contact_name") or proposal.get("investor_name") or "", + "email": proposal.get("contact_email") or "", + "title": proposal.get("contact_title") or "", + } + note = proposal.get("note") or "" + payload = { + "contact": contact, + "type": "note", + "body": note, + "subject": "Intake (Matrix)" if proposal.get("intent") != "meeting_note" else "Note (Matrix)", + "append_note": True, + "source": "matrix_intake", + } + match_id = proposal.get("_match_id") + if match_id: + payload["row_id"] = match_id + else: + payload["investor_name"] = proposal.get("investor_name") or proposal.get("contact_name") or "" + payload["create_investor_if_missing"] = True + return payload + + +def commit(proposal): + """Write the approved proposal to the CRM; return a short human summary for the thread.""" + payload = build_commit_payload(proposal) + status, data = _authed("POST", "/api/fundraising/log-communication", payload) + if status not in (200, 201): + raise RuntimeError(f"log-communication failed ({status}): {data.get('error') or data}") + row = (data.get("data") or {}).get("row") or {} + name = row.get("investor_name") or payload.get("investor_name") or "investor" + if proposal.get("_match_id"): + return f"Logged note to **{name}**." + return f"Added **{name}** to the grid" + (" with a note." if payload.get("body") else ".") diff --git a/backend/matrix_intake/matrix_io.py b/backend/matrix_intake/matrix_io.py new file mode 100644 index 0000000..3b3dbe9 --- /dev/null +++ b/backend/matrix_intake/matrix_io.py @@ -0,0 +1,55 @@ +"""Matrix plumbing lifted from matrix-bridge (src/bot.py): message splitting, thread-root +detection, and a threaded-reply sender. Kept dependency-light so the rest of the bot is +testable without a live homeserver.""" + +MAX_MSG_CHARS = 30000 # well under Matrix's ~64KB event cap + + +def split_message(text, limit=MAX_MSG_CHARS): + """Split text into <=limit-char chunks on newline boundaries (no truncation).""" + if len(text) <= limit: + return [text] + chunks, buf = [], "" + for line in text.splitlines(keepends=True): + while len(line) > limit: + if buf: + chunks.append(buf) + buf = "" + chunks.append(line[:limit]) + line = line[limit:] + if len(buf) + len(line) > limit: + chunks.append(buf) + buf = "" + buf += line + if buf: + chunks.append(buf) + return chunks + + +def thread_root_of(event): + """Return the thread root event_id if this message is a threaded reply, else None.""" + relates = (getattr(event, "source", None) or {}).get("content", {}).get("m.relates_to") or {} + if relates.get("rel_type") == "m.thread": + return relates.get("event_id") + return None + + +def thread_content(text, thread_root): + """Build an m.room.message content dict, threaded under thread_root when given.""" + content = {"msgtype": "m.text", "body": text} + if thread_root: + content["m.relates_to"] = { + "rel_type": "m.thread", + "event_id": thread_root, + "is_falling_back": True, + "m.in_reply_to": {"event_id": thread_root}, + } + return content + + +def make_say(client): + """Return an async say(room_id, text, thread_root=None) bound to a matrix-nio client.""" + async def say(room_id, text, thread_root=None): + for chunk in split_message(text): + await client.room_send(room_id, "m.room.message", thread_content(chunk, thread_root)) + return say diff --git a/backend/matrix_intake/parse.py b/backend/matrix_intake/parse.py new file mode 100644 index 0000000..b9d05dc --- /dev/null +++ b/backend/matrix_intake/parse.py @@ -0,0 +1,63 @@ +"""Turn a free-text intake message into a normalized proposal via local Qwen. + +The model only EXTRACTS structure; it never decides to write anything. New-vs-existing is +finalized in M2 against the CRM matcher — here `intent` is the model's first read. +""" +import re + +import spark + +SYSTEM = ( + "You extract structured investor-intake data from a short message a venture-fund " + "team member typed. Reply with ONLY a JSON object, no prose, with these keys:\n" + ' "intent": "new_investor" if the message introduces a new investor or prospect, ' + '"meeting_note" if it logs a note/update about an investor, else "unclear".\n' + ' "investor_name": the investing firm or entity name (e.g. "Acme Capital"), or null.\n' + ' "contact_name": the individual person mentioned, or null.\n' + ' "contact_email": the person\'s email if explicitly present, else null. Never invent one.\n' + ' "contact_title": the person\'s role/title if stated, else null.\n' + ' "note": any meeting note, context, or next step, else null.\n' + "Use null (not empty string) for anything not present. Output JSON only." +) + +_EMAIL_RE = re.compile(r"[^@\s]+@[^@\s]+\.[^@\s]+") +_VALID_INTENTS = {"new_investor", "meeting_note", "unclear"} +_FIELDS = ("intent", "investor_name", "contact_name", "contact_email", "contact_title", "note") + + +def _clean(v): + if v is None: + return None + s = str(v).strip() + if not s or s.lower() in ("null", "none", "n/a", "na", "unknown"): + return None + return s + + +def normalize(raw, source_text=""): + """Coerce the model's dict into a stable proposal shape; salvage an email from the + source text if the model missed one. Returns a dict with all _FIELDS keys.""" + raw = raw or {} + out = {k: _clean(raw.get(k)) for k in _FIELDS} + + intent = (out["intent"] or "").lower().replace("-", "_").replace(" ", "_") + out["intent"] = intent if intent in _VALID_INTENTS else "unclear" + + # Email integrity: only accept an address that literally appears in the source message. + # The model is unreliable for verbatim strings and must never mint an address — anything + # not present in what the human typed is dropped (a wrong email in the CRM is worse than + # none). This both salvages a missed address and rejects a hallucinated one. + m = _EMAIL_RE.search(source_text or "") + out["contact_email"] = m.group(0).rstrip(".,;:!?)]}>\"'") if m else None + + # An intake with no firm AND no person is not actionable. + if not out["investor_name"] and not out["contact_name"]: + out["intent"] = "unclear" + return out + + +def parse_message(text, parse_fn=spark.parse_json): + """Parse one intake message. `parse_fn` is injectable for tests (defaults to Spark/Qwen). + Returns a normalized proposal dict. On a model/transport failure, raises (caller decides).""" + raw = parse_fn(text, system=SYSTEM, max_tokens=400) + return normalize(raw, source_text=text) diff --git a/backend/matrix_intake/proposals.py b/backend/matrix_intake/proposals.py new file mode 100644 index 0000000..e5855c9 --- /dev/null +++ b/backend/matrix_intake/proposals.py @@ -0,0 +1,103 @@ +"""Pending-proposal store + the in-thread approval state machine. + +The one piece of state in the bot: a proposal awaiting a human's yes/edit/no, keyed by the +Matrix thread root (the bot's proposal lives in a thread rooted at the user's message, and +the user replies inside that thread). In-memory and ephemeral by design — a restart drops +pending proposals (the user just re-sends), matching matrix-bridge's stateless-by-default +ethos. Nothing here writes to the CRM; the bot calls the CRM client only after `approve`. +""" + +# field aliases accepted in `edit =` +_EDIT_ALIASES = { + "name": "investor_name", "investor": "investor_name", "firm": "investor_name", "org": "investor_name", + "contact": "contact_name", "person": "contact_name", + "email": "contact_email", + "title": "contact_title", "role": "contact_title", + "note": "note", +} + +_YES = {"yes", "y", "approve", "approved", "ok", "confirm", "go", "👍", "✅"} +_NO = {"no", "n", "cancel", "discard", "reject", "stop", "👎", "❌"} + + +class ProposalStore: + def __init__(self): + self._pending = {} # thread_root -> proposal dict + + def put(self, thread_root, proposal): + self._pending[thread_root] = proposal + + def get(self, thread_root): + return self._pending.get(thread_root) + + def pop(self, thread_root): + return self._pending.pop(thread_root, None) + + def has(self, thread_root): + return thread_root in self._pending + + +def _parse_edit(text): + """Parse 'edit field=value' (also 'field: value'); return (canonical_field, value) or None.""" + body = text.strip() + if body.lower().startswith("edit "): + body = body[5:].strip() + for sep in ("=", ":"): + if sep in body: + field, value = body.split(sep, 1) + field = field.strip().lower() + canon = _EDIT_ALIASES.get(field) + value = value.strip() + if canon and value: + return canon, value + # Not a known field on this separator — try the next one rather than bail, + # so e.g. "note: see deck=v2" still parses (split on ':' not the inner '='). + continue + return None + + +def interpret_reply(text): + """Classify a threaded reply to a pending proposal. + + Returns one of: + ("approve", None) | ("reject", None) | ("edit", (field, value)) | ("unknown", None) + """ + t = (text or "").strip() + low = t.lower() + if low in _YES: + return ("approve", None) + if low in _NO: + return ("reject", None) + edit = _parse_edit(t) + if edit: + return ("edit", edit) + return ("unknown", None) + + +def apply_edit(proposal, field, value): + """Return a copy of the proposal with one field changed.""" + updated = dict(proposal) + updated[field] = value + return updated + + +def render(proposal): + """Render a proposal as the in-thread message a human approves.""" + if proposal.get("intent") == "meeting_note": + head = f"📝 Proposed **meeting note** for **{proposal.get('investor_name') or proposal.get('contact_name') or '?'}**" + else: + head = f"📇 Proposed **new investor**: **{proposal.get('investor_name') or proposal.get('contact_name') or '?'}**" + lines = [head] + fields = [ + ("Investor", proposal.get("investor_name")), + ("Contact", proposal.get("contact_name")), + ("Email", proposal.get("contact_email")), + ("Title", proposal.get("contact_title")), + ("Note", proposal.get("note")), + ] + for label, val in fields: + if val: + lines.append(f"· {label}: {val}") + lines.append("") + lines.append("Reply **yes** to commit, **edit field=value** to change a field, or **no** to discard.") + return "\n".join(lines) diff --git a/backend/matrix_intake/requirements.txt b/backend/matrix_intake/requirements.txt new file mode 100644 index 0000000..cee1fa6 --- /dev/null +++ b/backend/matrix_intake/requirements.txt @@ -0,0 +1,4 @@ +# Matrix intake bot — isolated to this component's own process. matrix-nio is the ONLY +# third-party runtime dep and MUST NOT be added to the stdlib CRM (backend/server.py). +# The Spark/Qwen + CRM-API calls reuse the repo's stdlib HTTP client (backend/ingest/http_util). +matrix-nio>=0.24 diff --git a/backend/matrix_intake/settings.py b/backend/matrix_intake/settings.py new file mode 100644 index 0000000..7856730 --- /dev/null +++ b/backend/matrix_intake/settings.py @@ -0,0 +1,56 @@ +"""Config for the Matrix intake bot — Matrix creds + the dedicated intake room. + +Spark settings (SPARK_CONTROL_URL, CHAT_MODEL, …) are NOT read here; they come from the +reused ingest client (see spark.py), which loads the same repo .env. This module only owns +the Matrix connection and the CRM API target for the write-back leg (M2). +""" +import os + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +def load_env(path=None): + """Populate os.environ from the repo .env (setdefault — never clobber a real env var).""" + path = path or os.path.join(REPO_ROOT, ".env") + if not os.path.exists(path): + return + with open(path, "r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + os.environ.setdefault(k.strip(), v.strip()) + + +load_env() + + +def _require(name): + val = os.environ.get(name, "").strip() + if not val: + raise RuntimeError(f"matrix_intake: required env var {name} is not set (see .env.example)") + return val + + +# Matrix connection (resolved lazily so importing this module for tests never requires creds). +def matrix_settings(): + return { + "homeserver": _require("MATRIX_HOMESERVER"), + "user_id": _require("MATRIX_USER"), + "token": _require("MATRIX_ACCESS_TOKEN"), + "device_id": os.environ.get("MATRIX_DEVICE_ID", "ten31-intake-bot"), + "intake_room": _require("MATRIX_INTAKE_ROOM"), + } + + +# CRM API target for the write-back leg (M2). The CRM has no service-key auth path — auth is +# Bearer-JWT via /api/auth/login — so the bot logs in as a DEDICATED service user (a normal +# CRM user, created by an admin) and reuses the existing auth. Creds live in .env, never code. +def crm_settings(): + return { + "base": os.environ.get("CRM_API_BASE", "http://127.0.0.1:8080").rstrip("/"), + "username": os.environ.get("CRM_BOT_USERNAME", "").strip(), + "password": os.environ.get("CRM_BOT_PASSWORD", ""), + "verify_tls": os.environ.get("CRM_API_VERIFY_TLS", "true").lower() in ("1", "true", "yes", "on"), + } diff --git a/backend/matrix_intake/spark.py b/backend/matrix_intake/spark.py new file mode 100644 index 0000000..3b4eea2 --- /dev/null +++ b/backend/matrix_intake/spark.py @@ -0,0 +1,21 @@ +"""Thin reuse of the in-repo local-Qwen client (backend/ingest/llm.py) via Spark Control. + +We import the ingest client rather than re-implementing the HTTP call so the intake bot +speaks the exact same Spark contract (model, /v1/chat/completions, TLS verify, .env load). +The intake message is real LP substance, but it goes ONLY to the local Qwen on Ten31 infra +— never Claude — so no scrub boundary applies (same basis as the daily digest). Never call a +Spark directly; everything goes through SPARK_CONTROL_URL. +""" +import os +import sys + +_INGEST = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest") +if _INGEST not in sys.path: + sys.path.insert(0, _INGEST) + +import llm # noqa: E402 (backend/ingest/llm.py — chat / chat_json over Spark Control) + + +def parse_json(prompt, system=None, max_tokens=400): + """Send to local Qwen (temp 0, thinking off) and parse the first JSON object, or None.""" + return llm.chat_json(prompt, system=system, max_tokens=max_tokens) diff --git a/backend/matrix_intake/test_crm_client.py b/backend/matrix_intake/test_crm_client.py new file mode 100644 index 0000000..b97f9c6 --- /dev/null +++ b/backend/matrix_intake/test_crm_client.py @@ -0,0 +1,54 @@ +"""Tests for the CRM client's payload builder (pure logic, no network).""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import crm_client # noqa: E402 + + +def test_new_investor_payload(): + p = {"intent": "new_investor", "investor_name": "Acme Capital", + "contact_name": "Jane Doe", "contact_email": "jane@acme.com", + "contact_title": "GP", "note": "met at conf"} + out = crm_client.build_commit_payload(p) + assert out["investor_name"] == "Acme Capital" + assert out["create_investor_if_missing"] is True + assert "row_id" not in out + assert out["contact"] == {"name": "Jane Doe", "email": "jane@acme.com", "title": "GP"} + assert out["body"] == "met at conf" + assert out["source"] == "matrix_intake" + + +def test_existing_investor_uses_row_id_not_create(): + p = {"intent": "meeting_note", "investor_name": "Acme Capital", + "contact_name": "Jane Doe", "contact_email": None, "note": "wants Q3 deck", + "_match_id": "rowAcme"} + out = crm_client.build_commit_payload(p) + assert out["row_id"] == "rowAcme" + assert "create_investor_if_missing" not in out + assert "investor_name" not in out # targeted by row id, never re-matched by name + assert out["body"] == "wants Q3 deck" + + +def test_contact_falls_back_to_investor_name_when_no_person(): + p = {"intent": "new_investor", "investor_name": "Delta Fund", + "contact_name": None, "contact_email": None, "note": None} + out = crm_client.build_commit_payload(p) + assert out["contact"]["name"] == "Delta Fund" + assert out["body"] == "" + + +def test_no_email_sends_empty_string_not_none(): + p = {"intent": "new_investor", "investor_name": "Gamma", "contact_name": "Bob", + "contact_email": None, "note": "x"} + out = crm_client.build_commit_payload(p) + assert out["contact"]["email"] == "" + + +if __name__ == "__main__": + fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)] + for fn in fns: + fn() + print(f"ok {fn.__name__}") + print(f"\n{len(fns)} passed") diff --git a/backend/matrix_intake/test_parse.py b/backend/matrix_intake/test_parse.py new file mode 100644 index 0000000..d2c35e0 --- /dev/null +++ b/backend/matrix_intake/test_parse.py @@ -0,0 +1,93 @@ +"""Tests for the intake parse/normalize layer — Spark/Qwen stubbed (no network).""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import parse # noqa: E402 + + +def _stub(reply): + """Return a parse_fn that ignores input and yields `reply` (simulating Qwen's JSON).""" + return lambda text, system=None, max_tokens=400: reply + + +def test_new_investor_basic(): + p = parse.parse_message( + "New investor Acme Capital, contact Jane Doe jane@acme.com, met at the Austin conf", + parse_fn=_stub({"intent": "new_investor", "investor_name": "Acme Capital", + "contact_name": "Jane Doe", "contact_email": "jane@acme.com", + "contact_title": None, "note": "met at the Austin conf"}), + ) + assert p["intent"] == "new_investor" + assert p["investor_name"] == "Acme Capital" + assert p["contact_email"] == "jane@acme.com" + + +def test_email_salvaged_from_source_when_model_misses(): + p = parse.parse_message( + "add bob@example.org from Beta LP", + parse_fn=_stub({"intent": "new_investor", "investor_name": "Beta LP", + "contact_name": "Bob", "contact_email": None}), + ) + assert p["contact_email"] == "bob@example.org" + + +def test_fabricated_email_dropped_when_not_in_source(): + p = parse.parse_message( + "new prospect Gamma Partners, talked to their GP", + parse_fn=_stub({"intent": "new_investor", "investor_name": "Gamma Partners", + "contact_name": "their GP", "contact_email": "made-up@nowhere.test"}), + ) + # the model invented an address that isn't in the source → must be dropped + assert p["contact_email"] is None + + +def test_meeting_note_intent_preserved(): + p = parse.parse_message( + "Note for Acme Capital: wants the Q3 deck", + parse_fn=_stub({"intent": "meeting_note", "investor_name": "Acme Capital", + "note": "wants the Q3 deck"}), + ) + assert p["intent"] == "meeting_note" + assert p["note"] == "wants the Q3 deck" + + +def test_unclear_when_no_entity(): + p = parse.parse_message( + "hey what's up", + parse_fn=_stub({"intent": "new_investor", "investor_name": None, "contact_name": None}), + ) + assert p["intent"] == "unclear" + + +def test_null_strings_normalized(): + p = parse.parse_message( + "Delta Fund", + parse_fn=_stub({"intent": "new_investor", "investor_name": "Delta Fund", + "contact_name": "null", "contact_email": "N/A", "note": ""}), + ) + assert p["contact_name"] is None + assert p["contact_email"] is None + assert p["note"] is None + + +def test_bad_intent_falls_back_to_unclear(): + p = parse.parse_message( + "Epsilon Capital", + parse_fn=_stub({"intent": "garbage", "investor_name": "Epsilon Capital"}), + ) + assert p["intent"] == "unclear" + + +def test_none_model_reply_is_unclear(): + p = parse.parse_message("???", parse_fn=_stub(None)) + assert p["intent"] == "unclear" + + +if __name__ == "__main__": + fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)] + for fn in fns: + fn() + print(f"ok {fn.__name__}") + print(f"\n{len(fns)} passed") diff --git a/backend/matrix_intake/test_proposals.py b/backend/matrix_intake/test_proposals.py new file mode 100644 index 0000000..0d35743 --- /dev/null +++ b/backend/matrix_intake/test_proposals.py @@ -0,0 +1,95 @@ +"""Tests for the proposal store + approval state machine (pure logic, no network).""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import proposals # noqa: E402 + +SAMPLE = {"intent": "new_investor", "investor_name": "Acme Capital", + "contact_name": "Jane Doe", "contact_email": "jane@acme.com", + "contact_title": None, "note": "met at conf"} + + +def test_store_put_get_pop(): + s = proposals.ProposalStore() + assert not s.has("$root") + s.put("$root", SAMPLE) + assert s.has("$root") + assert s.get("$root")["investor_name"] == "Acme Capital" + assert s.pop("$root")["investor_name"] == "Acme Capital" + assert not s.has("$root") + assert s.pop("$missing") is None + + +def test_interpret_yes_variants(): + for t in ("yes", "Y", "approve", " ok ", "👍"): + assert proposals.interpret_reply(t)[0] == "approve", t + + +def test_interpret_no_variants(): + for t in ("no", "N", "cancel", "discard", "❌"): + assert proposals.interpret_reply(t)[0] == "reject", t + + +def test_interpret_edit_equals(): + action, payload = proposals.interpret_reply("edit email=new@acme.com") + assert action == "edit" + assert payload == ("contact_email", "new@acme.com") + + +def test_interpret_edit_colon_and_alias(): + action, payload = proposals.interpret_reply("firm: Acme Capital LLC") + assert action == "edit" + assert payload == ("investor_name", "Acme Capital LLC") + + +def test_interpret_unknown(): + assert proposals.interpret_reply("maybe later")[0] == "unknown" + + +def test_interpret_edit_colon_value_contains_equals(): + # the '=' inside the value must not break parsing — split on ':' first, keep the rest + action, payload = proposals.interpret_reply("note: see deck=v2") + assert action == "edit" + assert payload == ("note", "see deck=v2") + + +def test_claim_once_pop_guards_double_approve(): + # the double-approve guard relies on pop() yielding the proposal exactly once; + # a second claim returns None so a racing second 'yes' is a no-op + s = proposals.ProposalStore() + s.put("$r", SAMPLE) + assert s.pop("$r") is not None + assert s.pop("$r") is None + + +def test_edit_with_unknown_field_is_not_an_edit(): + # an unknown field name must not silently become an edit + assert proposals.interpret_reply("edit zipcode=90210")[0] == "unknown" + + +def test_apply_edit_is_nondestructive(): + updated = proposals.apply_edit(SAMPLE, "contact_email", "x@y.com") + assert updated["contact_email"] == "x@y.com" + assert SAMPLE["contact_email"] == "jane@acme.com" # original untouched + + +def test_render_includes_fields_and_instructions(): + text = proposals.render(SAMPLE) + assert "Acme Capital" in text + assert "jane@acme.com" in text + assert "yes" in text.lower() and "no" in text.lower() + + +def test_render_meeting_note_variant(): + note = dict(SAMPLE, intent="meeting_note") + assert "meeting note" in proposals.render(note).lower() + + +if __name__ == "__main__": + fns = [v for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)] + for fn in fns: + fn() + print(f"ok {fn.__name__}") + print(f"\n{len(fns)} passed") diff --git a/backend/server.py b/backend/server.py index 92a26ff..dcc3698 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1215,6 +1215,45 @@ def sync_fundraising_relational(conn, grid, views, actor_user_id=None): )) run_fundraising_automations(conn) +def find_intake_match(conn, q, email=None): + """Find an existing fundraising-grid investor for the intake bot's new-vs-existing hint. + + Scans the canonical grid blob (not the derived tables) so the returned `id` is the grid + row id that handle_log_fundraising_communication matches on — keeping the bot's proposal + consistent with where the write actually lands (no duplicate-investor risk). Matches by + normalized investor_name first (the write's own key), then falls back to a contact email. + Deleted investors are absent from the blob; graveyarded ones remain (a note on them is + still valid), so no extra filtering is needed.""" + row = conn.execute("SELECT grid_json FROM fundraising_state WHERE id = 'main'").fetchone() + if not row or not row['grid_json']: + return None + try: + grid = json.loads(row['grid_json']) + except Exception: + return None + rows = grid.get('rows', []) if isinstance(grid, dict) else [] + wanted_name = _normalize_text(q) if q else '' + wanted_email = (email or '').strip().lower() + email_hit = None + for r in rows: + if not isinstance(r, dict): + continue + rid = str(r.get('id') or '').strip() + if not rid: + continue + name = str(r.get('investor_name') or '').strip() + if wanted_name and _normalize_text(name) == wanted_name: + return {"id": rid, "investor_name": name, "matched_on": "name"} + if wanted_email and email_hit is None: + contacts = r.get('contacts') + if isinstance(contacts, list): + for c in contacts: + if isinstance(c, dict) and str(c.get('email') or '').strip().lower() == wanted_email: + email_hit = {"id": rid, "investor_name": name, "matched_on": "email"} + break + return email_hit + + def ensure_fundraising_state_row(conn): existing = conn.execute("SELECT * FROM fundraising_state WHERE id = 'main'").fetchone() if not existing: @@ -1829,6 +1868,10 @@ class CRMHandler(BaseHTTPRequestHandler): if path == '/api/outreach/radar': return self.handle_outreach_radar(user) + # Matrix intake bot — new-vs-existing lookup for its in-thread proposal + if path == '/api/intake/match': + return self.handle_intake_match(user, params) + # Users if path == '/api/users': return self.handle_list_users(user) @@ -2747,6 +2790,9 @@ class CRMHandler(BaseHTTPRequestHandler): contact_in = body.get('contact') append_note = bool(body.get('append_note', True)) create_investor_if_missing = bool(body.get('create_investor_if_missing', False)) + # Provenance: where this logged communication originated (grid UI vs the Matrix + # intake bot). Default preserves prior behavior; callers may override. + comm_source = (str(body.get('source') or 'fundraising_grid').strip() or 'fundraising_grid')[:64] if not row_id and not investor_name_in: return self.send_error_json("row_id or investor_name is required") @@ -2863,7 +2909,7 @@ class CRMHandler(BaseHTTPRequestHandler): user['user_id'] )) conn.execute("UPDATE contacts SET updated_at = ? WHERE id = ?", (now(), contact_id)) - log_audit(conn, user['user_id'], 'communication', comm_id, 'create', {"source": "fundraising_grid"}) + log_audit(conn, user['user_id'], 'communication', comm_id, 'create', {"source": comm_source}) iso_day = now()[:10] target_row['last_communication_date'] = iso_day @@ -2900,6 +2946,21 @@ class CRMHandler(BaseHTTPRequestHandler): conn.close() return self.send_json({"data": {"communication": comm, "row": target_row, "version": next_version}}, 201) + def handle_intake_match(self, user, params): + """Read-only: does an investor matching this intake already exist? Used by the + Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the + grid row id so an approved note lands on exactly that investor.""" + q = str(params.get('q') or '').strip() + email = str(params.get('email') or '').strip() + if not q and not email: + return self.send_error_json("q or email is required") + conn = get_db() + try: + match = find_intake_match(conn, q, email) + finally: + conn.close() + return self.send_json({"data": {"match": match}}) + def handle_update_communication(self, user, comm_id, body): conn = get_db() existing = conn.execute("SELECT id FROM communications WHERE id = ?", (comm_id,)).fetchone() diff --git a/backend/test_intake_endpoints.py b/backend/test_intake_endpoints.py new file mode 100644 index 0000000..8bacfeb --- /dev/null +++ b/backend/test_intake_endpoints.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +"""Tests for the Matrix-intake CRM surface (v0.1.0 Matrix-intake M2). + +The bot adds no parallel write path — it reuses /api/fundraising/log-communication and adds +one read-only lookup, GET /api/intake/match. This boots the REAL server against a temp DB and +asserts: + - match by normalized name and by contact email, returning the GRID ROW id; + - the new-vs-existing contract: a bot-style create (log-communication + + create_investor_if_missing) then matches by name — so an approved note lands on that same + investor instead of duplicating it; + - provenance: an intake-sourced communication is audited with source="matrix_intake"; + - guards: missing q/email -> 400, unauthenticated -> 401. +Synthetic data only. + +Run: cd backend && python3 test_intake_endpoints.py +""" +import http.client +import json +import os +import sqlite3 +import sys +import tempfile +import threading +from http.server import ThreadingHTTPServer + +_DATA = tempfile.mkdtemp() +os.environ["CRM_DATA_DIR"] = _DATA +os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import server # noqa: E402 + +FAILS = [] + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +class _Quiet(server.CRMHandler): + def log_message(self, *a): + pass + + +def _req(port, method, path, token=None, body=None): + conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10) + headers = {} + if token: + headers["Authorization"] = "Bearer " + token + payload = None + if body is not None: + payload = json.dumps(body) + headers["Content-Type"] = "application/json" + conn.request(method, path, body=payload, headers=headers) + resp = conn.getresponse() + raw = resp.read().decode("utf-8", "replace") + conn.close() + data = None + if raw: + try: + data = json.loads(raw) + except ValueError: + pass + return resp.status, data + + +GRID = { + "columns": [], + "rows": [ + {"id": "rowAcme", "investor_name": "Acme Capital", "notes": "", + "contacts": [{"name": "Jane Doe", "email": "jane@acme.com", "title": "GP"}]}, + ], +} + + +def seed(): + c = sqlite3.connect(os.environ["CRM_DB_PATH"]) + c.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) " + "VALUES ('u1','grant','grant@ten31.example','x','Grant','admin',1)") + # init_db doesn't create the 'main' state row (it's created lazily on first write), so + # upsert rather than UPDATE — a plain UPDATE would silently match zero rows. + c.execute("INSERT INTO fundraising_state (id, grid_json, views_json, version) " + "VALUES ('main', ?, '[]', 1) " + "ON CONFLICT(id) DO UPDATE SET grid_json = excluded.grid_json", (json.dumps(GRID),)) + c.commit() + c.close() + + +def main(): + server.init_db() + seed() + token = server.create_token("u1", "grant", "admin") + + httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet) + port = httpd.server_address[1] + threading.Thread(target=httpd.serve_forever, daemon=True).start() + try: + print("\n[match: existing investor by name returns the grid row id]") + st, d = _req(port, "GET", "/api/intake/match?q=Acme%20Capital", token) + m = (d or {}).get("data", {}).get("match") + check(st == 200 and m and m["id"] == "rowAcme" and m["matched_on"] == "name", + f"name match -> rowAcme (got {st}, {m})") + + print("\n[match: case-insensitive name]") + st, d = _req(port, "GET", "/api/intake/match?q=acme%20capital", token) + m = (d or {}).get("data", {}).get("match") + check(m and m["id"] == "rowAcme", f"normalized name match (got {m})") + + print("\n[match: by contact email]") + st, d = _req(port, "GET", "/api/intake/match?email=jane@acme.com", token) + m = (d or {}).get("data", {}).get("match") + check(m and m["id"] == "rowAcme" and m["matched_on"] == "email", + f"email match -> rowAcme (got {m})") + + print("\n[match: unknown -> null]") + st, d = _req(port, "GET", "/api/intake/match?q=Nobody%20LP", token) + check(st == 200 and (d or {}).get("data", {}).get("match") is None, + f"no match -> null (got {st}, {d})") + + print("\n[match: missing q and email -> 400]") + st, _ = _req(port, "GET", "/api/intake/match", token) + check(st == 400, f"no params -> 400 (got {st})") + + print("\n[match: unauthenticated -> 401]") + st, _ = _req(port, "GET", "/api/intake/match?q=Acme", None) + check(st == 401, f"no token -> 401 (got {st})") + + print("\n[bot create: log-communication + create_investor_if_missing, source tagged]") + st, d = _req(port, "POST", "/api/fundraising/log-communication", token, { + "investor_name": "Beacon Ventures", + "contact": {"name": "Sam Lee", "email": "sam@beacon.vc", "title": "Partner"}, + "create_investor_if_missing": True, + "type": "note", "subject": "Intake (Matrix)", "body": "met at the Austin conf", + "source": "matrix_intake", + }) + check(st in (200, 201), f"create new investor -> 201 (got {st})") + + print("\n[new-vs-existing contract: the just-created investor now matches by name]") + st, d = _req(port, "GET", "/api/intake/match?q=Beacon%20Ventures", token) + m = (d or {}).get("data", {}).get("match") + check(m and m.get("investor_name") == "Beacon Ventures", + f"created investor is matchable (no duplicate on next note) (got {m})") + + print("\n[provenance: the intake communication is audited as source=matrix_intake]") + c = sqlite3.connect(os.environ["CRM_DB_PATH"]) + rows = c.execute("SELECT changes FROM audit_log WHERE entity_type='communication' AND action='create'").fetchall() + c.close() + sources = [json.loads(r[0]).get("source") for r in rows if r[0]] + check("matrix_intake" in sources, f"audit carries source=matrix_intake (got {sources})") + finally: + httpd.shutdown() + + print() + if FAILS: + print(f"FAILED ({len(FAILS)}):") + for f in FAILS: + print(f" - {f}") + sys.exit(1) + print("ALL PASS (matrix-intake endpoints)") + + +if __name__ == "__main__": + main() diff --git a/docs/guides/matrix-intake.md b/docs/guides/matrix-intake.md new file mode 100644 index 0000000..dd6a9ab --- /dev/null +++ b/docs/guides/matrix-intake.md @@ -0,0 +1,68 @@ +--- +paths: + - backend/matrix_intake/** +--- + +# Matrix intake bot + +Read this before editing `backend/matrix_intake/`. The bot turns a typed message in a +dedicated Matrix room into a proposed fundraising-grid add/edit, gated on **in-thread human +approval** before any write. Phase status: **M1 + M2 built** (text intake + approval + write); +**M3 (business-card photo) deferred** — Spark Control has no vision model yet. + +## What it is (and isn't) + +- A **separate process**, not part of the CRM. Its only third-party dep, `matrix-nio`, lives + in `backend/matrix_intake/requirements.txt` and **must never** be added to the stdlib CRM + (`backend/server.py`). Runs on the Spark (placement per `standards/guides/placement.md`). +- It **drafts; a human approves.** Nothing is written autonomously — every CRM write follows a + `yes` reply in the proposal thread. This is exempt from "agents draft, humans send" the same + way the digest is: it's internal data entry to our own CRM, not outward LP contact. +- It is **not** a parallel write path. It reuses the CRM's own canonical endpoint + `POST /api/fundraising/log-communication` (create-if-missing + contact upsert + note + + relational sync + audit) for both new-investor and existing-note cases. Don't reimplement + grid mutation in the bot. + +## Flow + +1. Top-level message in the intake room → `parse.parse_message` → local **Qwen via Spark + Control** (`spark.py` reuses `backend/ingest/llm.py`; temp 0, JSON only) extracts + `{intent, investor_name, contact_name, contact_email, contact_title, note}`. +2. `crm_client.match` (`GET /api/intake/match`) checks new-vs-existing and returns the **grid + row id** so an approved note lands on exactly that investor (no duplicate). +3. The proposal is posted **in a thread** rooted at the user's message; the pending proposal is + held in memory keyed by that thread root (`proposals.ProposalStore`). +4. User replies in-thread: `yes` / `edit field=value` / `no`. On `yes`, `crm_client.commit` + POSTs to `log-communication` tagged `source="matrix_intake"` (provenance in the audit log). + +## Rules / gotchas + +- **Module-name collision:** the intake config module is `settings.py`, **not** `config.py`, + because `backend/ingest/config.py` is imported (as bare `config`) through `spark → llm`. A + second `config` module would shadow it in `sys.modules` and break `llm` (`CHAT_MODEL`). + Keep intake module names from colliding with ingest's (`config`, `http_util`, `llm`). +- **Email integrity:** `parse.normalize` only keeps an address that literally appears in the + source message — the model must never mint one (a wrong email is worse than none). It takes + the **first** address in the text, so a two-person message ("Alice a@x.com and Bob b@y.com") + could attach the wrong one; the human sees it in the proposal and can `edit email=…` before + approving. Cross-referencing multiple addresses to the named contact is a deliberate non-goal + for v1. +- **Double-approve guard:** `handle_reply` pops the pending proposal from the store *before* + awaiting the commit, so a second `yes` arriving mid-write is a no-op (asyncio is cooperative; + the pop is atomic w.r.t. other events). On commit failure the proposal is restored for retry. +- **Local-only parse:** intake text is real LP substance but goes ONLY to local Qwen via Spark + Control, never Claude — so no scrub boundary applies (same basis as the digest). Never call a + Spark directly; always go through `SPARK_CONTROL_URL`. +- **Auth:** the CRM has no service-key path; the bot logs in as a dedicated CRM user + (`CRM_BOT_USERNAME`/`CRM_BOT_PASSWORD`) → Bearer JWT, re-login once on 401. +- **Tests** are offline: `test_parse.py` / `test_proposals.py` / `test_crm_client.py` stub the + network; `backend/test_intake_endpoints.py` boots the real server against a temp DB and + covers `/api/intake/match` + the create→match (no-duplicate) contract + provenance. A **live + Matrix smoke** needs creds + `matrix-nio` installed on the Spark — it can't run in CI. + +## Config + +All in `.env` (names in `.env.example`): `MATRIX_HOMESERVER`, `MATRIX_USER`, +`MATRIX_ACCESS_TOKEN`, `MATRIX_DEVICE_ID`, `MATRIX_INTAKE_ROOM`; `CRM_API_BASE`, +`CRM_BOT_USERNAME`, `CRM_BOT_PASSWORD`, `CRM_API_VERIFY_TLS`. Spark settings are inherited from +the ingest client (`SPARK_CONTROL_URL`, `CRM_CHAT_MODEL`).