Add NL-query backend (W2): local translator + safe named-query runner

Read-only "ask the database in plain English" backend. Translation runs on
the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves
the box, no Claude and no redaction boundary (the simplification chosen after
pressure-testing). The safe surface is a curated catalog of ~12 hand-written
parameterized queries; a slot validator is the trust boundary (no generic SQL,
no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated
require_bot_or_admin, read-only, audited. Soft-delete-correct per table.
Local Qwen translated 12/12 real example questions correctly against the live
Spark. Web "Ask" box and Matrix bot still to come (steps 4-5).
This commit is contained in:
Keysat
2026-06-18 18:35:41 -05:00
parent a166b49397
commit 6c29c22601
13 changed files with 1348 additions and 13 deletions
+1
View File
@@ -0,0 +1 @@
../../docs/guides/nl-query.md
+9 -11
View File
@@ -48,6 +48,7 @@ cd start9/0.4 && make
- `backend/redaction/``scrub.py` + `client.py`: the scrub→Claude→re-hydrate privacy boundary. - `backend/redaction/``scrub.py` + `client.py`: the scrub→Claude→re-hydrate privacy boundary.
- `backend/ingest/` — chunk→embed→Qdrant + retrieval modes. - `backend/ingest/` — chunk→embed→Qdrant + retrieval modes.
- `backend/entity_*.py` — entity resolution/merge (the two-investor-model reconciliation). - `backend/entity_*.py` — entity resolution/merge (the two-investor-model reconciliation).
- `backend/nl_query/` — read-only natural-language query (W2): `intents.py` (curated parameterized query catalog), `runner.py` (slot validator = trust boundary), `translate.py` (local-Qwen question→{intent,slots}). See the nl-query guide.
- `backend/matrix_intake/` — Matrix intake bot (separate process; `matrix-nio`, isolated to this component): typed message → local-Qwen parse → in-thread approve → write via the CRM's own `log-communication`. See the matrix-intake guide. - `backend/matrix_intake/` — Matrix intake bot (separate process; `matrix-nio`, isolated to this component): typed message → local-Qwen parse → in-thread approve → write via the CRM's own `log-communication`. See the matrix-intake guide.
- `frontend/index.html` — the entire UI. - `frontend/index.html` — the entire UI.
- `docs/` — architecture, phase plans, contracts, runbooks (see Deeper docs). `docs/guides/` — scoped subsystem rules (see below). - `docs/` — architecture, phase plans, contracts, runbooks (see Deeper docs). `docs/guides/` — scoped subsystem rules (see below).
@@ -65,6 +66,7 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude
- **Email capture / drafts + digest send** (`backend/email_integration/`, `backend/digest_mailer.py`, `backend/smtp_send.py`) → `docs/guides/email.md` - **Email capture / drafts + digest send** (`backend/email_integration/`, `backend/digest_mailer.py`, `backend/smtp_send.py`) → `docs/guides/email.md`
- **Building or deploying the s9pk** (`start9/`) → `docs/guides/packaging.md` - **Building or deploying the s9pk** (`start9/`) → `docs/guides/packaging.md`
- **Matrix intake bot** (`backend/matrix_intake/`) → `docs/guides/matrix-intake.md` - **Matrix intake bot** (`backend/matrix_intake/`) → `docs/guides/matrix-intake.md`
- **Natural-language query** (`backend/nl_query/`) → `docs/guides/nl-query.md`
## Conventions ## Conventions
@@ -104,17 +106,13 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude
## Current state ## Current state
_Phase 0 + Phase 1 built; **box live at v0.1.0:91; repo at v0.1.0:92** (v92 = reminders/follow-ups — built + tested locally 2026-06-18, **deploy pending**. Box deployed & verified live 2026-06-18 — `installed-version`=0.1.0:91, server up on :8080, clean; the StartOS version-graph traversal logs an inert down-to-39-then-up because the per-version `up`/`down` hooks are no-ops — real SQLite migrations run in-app at startup). **The fundraising grid + email capture is the canonical system of record.** Deploy/feature history: git log + `start9/0.4/startos/versions/`; longer-term backlog/debt: `ROADMAP.md` / `EVALUATION.md`._ _Phase 0 + Phase 1 built; **box live at v0.1.0:91; repo at v0.1.0:92** (reminders, deploy pending). **The fundraising grid + email capture is the canonical system of record.** Active thread: **W2 natural-language query** (backend done + validated locally; web/Matrix UI next). Deploy/feature history: git log + `start9/0.4/startos/versions/`; longer-term backlog/debt: `ROADMAP.md` / `EVALUATION.md`._
- **Reminders & follow-ups (W1) — BUILT + tested locally 2026-06-18 (repo v0.1.0:92, deploy pending).** First step of the agreed reminders → NL-search → bot-mutations plan (`ROADMAP.md` "Follow-ups/reminders + NL search + bot grid-mutations"; **overarching constraint: keep LP data off third-party LLMs — the dominant risk, above write-safety**). First-class tickler tied to the grid: `reminders` table (in-app migration `0006`; logical FK to `fundraising_investors.id` + denormalized name, like `0005`), full CRUD (`GET/POST/PATCH/DELETE /api/reminders`; soft-delete; open/done/snoozed/cancelled; assignee; `source` human/bot/automation; accepts `source_row_id` so the grid stays decoupled), a read-only **derived `reminder_status` grid column** (overdue/due_soon/open — injected + stripped like `pipeline_stage`; **filterable so a saved view can later drive the follow-up view off real reminders, not the binary `follow_up` checkbox**), an orphan reconciler (`reconcile_grid_reminders`), a **Reminders** page + Dashboard **"Reminders Due"** card + **"Reminders due"** daily-digest section, and a per-investor **`last_activity_at`** recency rollup (shared building block for the W2 NL "not nurtured" query). **Pure local CRM — no LLM path, no leak surface.** Snooze keeps a reminder `open` with a pushed-out date (reliably reappears); the `snoozed` status is an explicit "mute" (Edit only). Tests: `test_reminders.py` + digest reminders test (**31/31 green, render-smoke green**). **Not yet deployed** — needs an s9pk build + install (authorize first; verify `0006` against a DB copy). Deferred fast-follow **W1b** = nurture-gap auto-suggested reminders. - **W2 — natural-language query (read-only): BACKEND built + tested + validated locally 2026-06-18; web/Matrix UI next.** `backend/nl_query/` — 12 curated parameterized queries + a slot validator (the trust boundary; no generic SQL) + a **local-Qwen** translator (question→{intent,slots} via Spark Control; nothing leaves the box, **no Claude, no redaction** — the simplification Grant chose). `POST /api/query/nl` (also accepts direct `{intent,slots}`) + `GET /api/query/catalog`, `require_bot_or_admin`, audited (`entity_type='nl_query'`). **Local Qwen translated 12/12 of Grant's real example questions correctly against the live Spark** — settles local-only (Claude not needed). Soft-delete-correct per table (gotcha: `fundraising_*` has **no `deleted_at`**`graveyard` is the axis; emails via a live `eam` sighting). Guide: `docs/guides/nl-query.md`. **Next: step 4 web "Ask" box (Communications tab), step 5 Matrix `@bot <q>`** — both thin clients of the endpoint. Not committed at deploy/version level yet; folds into the next s9pk.
- **Email-proposal review over Matrix + a `bot` role — DEPLOYED, LIVE & smoke-tested 2026-06-18 (box v0.1.0:91, Spark bot `b2690c4`).** The CRM-drafted "proposed grid notes" gain: (1) a click-to-view **inline source-email popup** on the Email Capture page (`GET /api/email/detail` — from/to/cc/date/subject + scrollable body); (2) a **CRM→Matrix review bridge** — the bot pulls pending proposals (`GET /api/intake/email-proposals`), posts dash-framed review cards (note names who emailed whom, not "Sent/Received") to a dedicated room (`MATRIX_EMAIL_REVIEW_ROOM`), and relays in-thread yes/no/NL-edit (`POST .../decide`), kept in sync with the web panel (decide on either → the other reflects it). **Decided threads are redacted whole** (card + replies; the bot holds a redact/mod power level) so — with Element's "show deleted messages" OFF — the main chat *and* threads view clear completely (confirmed the intended UX). New **`bot` role** (authenticated, never admin; `require_bot_or_admin`) gates the agent endpoints; state in `email_proposal_matrix` (email-migration `0003`). Full mechanics, deploy gotchas, and the `redact_resolved.py` backfill tool: `docs/guides/matrix-intake.md`. - **W1 — reminders & follow-ups: BUILT + tested locally (v0.1.0:92), DEPLOY PENDING.** First-class tickler tied to the grid (migration `0006`; CRUD `GET/POST/PATCH/DELETE /api/reminders`; derived `reminder_status` grid column; Reminders page + dashboard card + digest section; the `last_activity_at` recency rollup that W2 reuses). Needs s9pk build + install (authorize first; verify `0006` against a DB copy). Deferred **W1b** = nurture-gap auto-suggested reminders.
- **Adopt the Pipeline — LIVE & smoked (v0.1.0:88):** the grid drives the deal board. "Add to Pipeline" row action → durably-linked `opportunities` row via `opportunities.fundraising_investor_id` (migration 0005); grid owns link+seed, board owns stage/probability/owner; "Remove" soft-deletes the opp, grid row intact; deleting a grid investor archives its orphaned opp. Detail + locked decisions: `ROADMAP.md` "Adopt the Pipeline". - **Done & live (detail in git log / ROADMAP):** email-proposal Matrix review + `bot` role (box v91); grid-driven Pipeline (v88); Matrix intake bot (Spark `matrix-intake` container); Gmail capture (DWD) + propose→approve + daily digest; Thesis Workshop + Architect (Claude, dual-approval); outreach drafts + radar. All draft-only.
- **Tests:** **34/34 backend green** (`python3 backend/run_tests.py`; +`nl_query/` suite), `py_compile` clean; render-smoke gates `make`.
- **Matrix intake bot — LIVE (Spark, container `matrix-intake`, bot `b2690c4`):** typed Matrix message → local-Qwen parse → in-thread human approval → write via `POST /api/fundraising/log-communication` (`source="matrix_intake"`); new-vs-existing via `GET /api/intake/match`. Fuzzy match + numbered disambiguation shortlist, conversational NL revise (email-integrity preserved), and the `INTAKE_TEAM_ROSTER` outreach frame all shipped & mostly smoked. **One path still un-smoked in-room: the fuzzy disambiguation numbered-pick grammar.** Runs as a docker-compose service (`restart: unless-stopped`). Guide: `docs/guides/matrix-intake.md`. - **Next (priority order):** 1) **W2 step 4** web Ask box, then **step 5** Matrix `@bot`; 2) **deploy reminders (v92) + W2 together** — bump to **v0.1.0:93**, build s9pk, install, browser-verify (authorize first; verify `0006` against a DB copy); 3) **W3** bot grid-mutations behind the Matrix approval gate (local-Qwen parse); 4) **W1b** nurture-gap reminders; 5) Grant + Jonathan freeze v2.0 canonical; 6) in-room smoke of the intake disambiguation numbered-pick grammar; then P2 debt (reports comms-aggregate soft-delete sweep, `?limit=abc` crash, auth regression test, oversized StartOS icon).
- **Working (all draft-only):** CRM + ingest (chunk→embed→Qdrant + retrieval) + redaction boundary; Gmail capture (DWD) + email-activity propose→approve; Thesis Workshop + Architect (Claude) with dual-approval gate; Outreach Draft Assistant + follow-up radar + per-user voice + Tier-B in-thread Gmail draft creation. - **Open / risks:** W2 translation only **happy-path-validated** (typos/ambiguous/no-match phrasings shake out in live use); **Claude/Architect path still unverified live on the box**; v2.0 reserve-asset spine is the *working approved* spine but **not canonical** (needs dual sign-off); doc drift — `crm-overview.md` + `EVALUATION.md` still call `lp_profiles` live.
- **Tests:** **30/30 backend green** (`python3 backend/run_tests.py`), `py_compile` clean; frontend render-smoke gates the default `make` build.
- **Debt (P2, not deploy-blocking; full list `EVALUATION.md`):** reports-subsystem soft-delete sweep — **pipeline/opportunities aggregates fixed v87**; remaining: the dashboard **communications** aggregates (`recent_comms`/`comms_this_month`/`meetings_this_month`) + activity report + report-endpoint tests; `?limit=abc` crashes the request thread; auth regression test for the 3 v79-gated GETs (`/api/users`, `/api/email/status`, `/api/email/accounts`); scrub-gateway TLS verify off; hardcoded Spark/Qdrant IPs + **oversized StartOS package icon** (fix before the next s9pk upload); the 5.4k-line `server.py` monolith.
- **Open / risks:** the v2.0 reserve-asset spine is the *working* approved spine but **not a canonical `thesis_version`** (needs Grant + Jonathan dual sign-off; Appendix-A conviction incl. ~40% Strike stays Grant's working read, not fed to the engine); **Claude/Architect path still unverified live on the box**; the intake matcher reads only the grid blob (not classic `contacts`); doc drift — `crm-overview.md` + `EVALUATION.md` still call `lp_profiles` live (doc-auditor pass).
- **Next:** 1) **deploy v0.1.0:92 (reminders)** to the box — needs authorization; verify migration `0006` against a copy of `data/crm.db`, then `make` + install + browser-verify the Reminders page/grid chip/dashboard card (only render-smoke ran locally, not a live authenticated click-through); 2) **W2 — NL→safe-query** (the agreed plan's next build; validated filter-AST, Claude behind redaction, only the question text leaves the box; web + Matrix; = old "search item 3"), then **W3 — bot grid-mutations** behind the Matrix approval gate; 3) **W1b** nurture-gap auto-suggested reminders (fast-follow once recency proven); 4) spark-control intake dashboard card + extract intake bot to its own repo (ROADMAP); 5) in-room smoke of the intake **disambiguation** numbered-pick grammar; 6) Grant + Jonathan freeze v2.0 canonical; 7) reply-all for Tier-B drafts; then clear the P2 debt (reports comms-aggregate soft-delete sweep, `?limit=abc` crash, auth regression test, oversized StartOS icon). **Possible follow-ups:** email-review `since`-floor on `to_post`; Pipeline drag-and-drop stage moves.
+7 -2
View File
@@ -93,7 +93,12 @@
- **W1b (deferred fast-follow):** nurture-gap automation — a daily job flags "committed / in-pipeline + no activity in N days + no open reminder" → auto-suggests a reminder (`source='automation'`, human confirms). Build once the recency rollup is proven in practice. - **W1b (deferred fast-follow):** nurture-gap automation — a daily job flags "committed / in-pipeline + no activity in N days + no open reminder" → auto-suggests a reminder (`source='automation'`, human confirms). Build once the recency rollup is proven in practice.
- **Left untouched (deliberate):** the grid `follow_up` checkbox + automation list-memberships, and `communications.next_action_date` + `/api/outreach/radar` — reminders are the new richer layer; folding those into it is a later cleanup, not now. - **Left untouched (deliberate):** the grid `follow_up` checkbox + automation list-memberships, and `communications.next_action_date` + `/api/outreach/radar` — reminders are the new richer layer; folding those into it is a later cleanup, not now.
**W2 — Natural-language query (read-only).** = the **"Email/communication search + NL query → item 3 (NL→safe structured query)"** below, now sequenced second. Locked stance: the LLM emits a **validated filter/query AST** over a curated field set (committed $, fund, stage, lead, `follow_up`, `last_activity_at`, `reminder_status`, …); the backend owns the SQL against soft-delete-filtered views with row/time caps — **never raw SQL**. Claude behind the redaction boundary; only the question text + schema vocabulary leave the box, never investor rows. Deliver in **both** web (search box) and Matrix (`@bot who needs follow-up?`). Reads need no approval gate. Builds on W1's `last_activity_at`. **W2 — Natural-language query (read-only). BACKEND BUILT + tested + validated locally 2026-06-18; web/Matrix UI pending.** = the **"Email/communication search + NL query → item 3 (NL→safe structured query)"** below, now sequenced second and **redesigned** (see below). Subsystem detail: `docs/guides/nl-query.md`.
- **Approach changed from the original "Claude behind redaction + a validated filter-AST" to LOCAL-ONLY + a named-intent catalog (decided with Grant 2026-06-18).** Rationale: (a) the dominant risk is LP data reaching a vendor — running translation on the **local Qwen via Spark Control** keeps the question on the box entirely (same basis as intake/digest), so there is **no Claude path and no redaction boundary** to manage, which is both simpler and safer; (b) a generic SQL/AST compiler was over-built for the real need — instead there are **~12 curated, hand-written, parameterized "named queries"** (`backend/nl_query/intents.py`) each with typed slots, and the **slot validator** (`runner.validate`) is the whole trust boundary (no dynamic identifiers, no raw SQL). The LLM only maps a question → `{intent, slots}`; its output is still validated, so a hallucinated intent is rejected. **Results never go to any model** (deterministic local render). Both design choices were pressure-tested by independent review agents before building.
- **As built:** `backend/nl_query/` (`intents.py` catalog, `runner.py` validator/executor + audit, `translate.py` local-Qwen translator, `try_questions.py` dev harness). `POST /api/query/nl` (`{question}` or direct `{intent,slots}`) + `GET /api/query/catalog`, `require_bot_or_admin`, read-only, audited (`audit_log` `entity_type='nl_query'`). Soft-delete-correct per table (`fundraising_*` has no `deleted_at``graveyard` is the axis; emails via a live `email_account_messages` sighting; reminders/opps/comms via `deleted_at`). Builds on W1's `last_activity_at`. Tests: `nl_query/test_nl_query.py` + `test_translate.py` + `test_nl_query_endpoint.py` (34/34 suite green).
- **Validation:** the local Qwen translated **12/12 of Grant's real example questions** correctly (right intent + slots, incl. "3 months"→90, sent/received→direction) against the live Spark — settles local-only; Claude not needed. Translation quality on messy/typo/no-match inputs shakes out in live use.
- **Remaining:** **step 4** = web "Ask" box in the Communications tab (calls the endpoint, renders rows + the interpreted query); **step 5** = Matrix `@bot <question>` (thin client of the endpoint; the 2-admin review room means a full-book dump is acceptable, so no bulk-result cap — only a light anti-flood truncation). Reads need no approval gate. Then deploy with reminders (v92) as **v0.1.0:93**.
**W3 — Bot grid-mutations behind a Matrix approval gate.** Generalize the email-proposal scaffold (`email_proposal_matrix` + propose→post→decide→apply) into one `agent_proposals` table (kind discriminator + JSON payload + target). Bot proposes set-commitment / assign-fund / change-stage / set-reminder; a human approves/edits/rejects in Matrix (**any member**); then apply. **Surgical, version-checked mutations — never blob RMW:** stage rides the existing `opportunities` link + validated stage endpoint; reminders write the W1 table; set-commitment/assign-fund need a version-checked single-cell upsert into the grid blob. Triggers the deferred **scoped service-token** item below (per-mutation-kind allowlist on the bot credential; money/merge/delete always require human approval regardless of scope — the autonomy axis). Parse on local Qwen, not Claude. **W3 — Bot grid-mutations behind a Matrix approval gate.** Generalize the email-proposal scaffold (`email_proposal_matrix` + propose→post→decide→apply) into one `agent_proposals` table (kind discriminator + JSON payload + target). Bot proposes set-commitment / assign-fund / change-stage / set-reminder; a human approves/edits/rejects in Matrix (**any member**); then apply. **Surgical, version-checked mutations — never blob RMW:** stage rides the existing `opportunities` link + validated stage endpoint; reminders write the W1 table; set-commitment/assign-fund need a version-checked single-cell upsert into the grid blob. Triggers the deferred **scoped service-token** item below (per-mutation-kind allowlist on the bot credential; money/merge/delete always require human approval regardless of scope — the autonomy axis). Parse on local Qwen, not Claude.
@@ -171,7 +176,7 @@ Open design questions (settled at build time): send time = **6 PM box-local** (c
**2. Email content search box — DONE (v0.1.0:83).** A **"Search content"** toggle in the Communications tab → `GET /api/email/search?q=` wraps `backend/ingest/search.py:hybrid_search` filtered to `doc_type='email'`; hits are hydrated + soft-delete-filtered against SQLite (canonical) and link back to the full body. Semantic/lexical search over email *content* ("find where we discussed the mining deal"), distinct from item 1's structured filters. 503 (clean "unavailable") when Spark/Qdrant is unreachable. **2. Email content search box — DONE (v0.1.0:83).** A **"Search content"** toggle in the Communications tab → `GET /api/email/search?q=` wraps `backend/ingest/search.py:hybrid_search` filtered to `doc_type='email'`; hits are hydrated + soft-delete-filtered against SQLite (canonical) and link back to the full body. Semantic/lexical search over email *content* ("find where we discussed the mining deal"), distinct from item 1's structured filters. 503 (clean "unavailable") when Spark/Qdrant is unreachable.
**3. Natural-language → safe structured query (separate, larger, after 1 & 2).** An LLM translates a plain-English question into a **safe, read-only** DB query against the CRM, for relational/analytical questions that semantic search *cannot* answer — Grant's example ("committed across funds AND not emailed in a while") is joins + aggregates + recency, not a text-topic match. Design constraints (locked at request time, refine at build): **3. Natural-language → safe structured query — SUPERSEDED & BUILT as W2 above (2026-06-18).** The design constraints below (especially "LLM = Claude behind the redaction boundary" and the validated-AST shape) were **revisited and changed** during the build: translation runs on the **local Qwen** (no Claude, no redaction), and the safe surface is a **named-intent catalog**, not a generic query AST. See the W2 entry above and `docs/guides/nl-query.md` for what shipped; the original framing is kept here for provenance. _An LLM translates a plain-English question into a **safe, read-only** DB query against the CRM, for relational/analytical questions that semantic search *cannot* answer — Grant's example ("committed across funds AND not emailed in a while") is joins + aggregates + recency, not a text-topic match. Original design constraints (locked at request time):_
- **LLM = Claude behind the redaction boundary** (better at text-to-SQL than local Qwen; the scrub→Claude→re-hydrate path already exists for the PII concern). Not Spark — Spark Control offers embeddings/rerank/RAG + local chat, but **no text-to-SQL**. - **LLM = Claude behind the redaction boundary** (better at text-to-SQL than local Qwen; the scrub→Claude→re-hydrate path already exists for the PII concern). Not Spark — Spark Control offers embeddings/rerank/RAG + local chat, but **no text-to-SQL**.
- **Safety is the hard part, not the parsing.** Do NOT hand the LLM open-ended SQL against the live DB (soft-delete leaks, injection, runaway scans). Constrain it: read-only connection/view, a curated/parameterized query surface or a validated query AST, soft-delete-filtered views, row/time caps. Treat as its own designed feature with its own tests. - **Safety is the hard part, not the parsing.** Do NOT hand the LLM open-ended SQL against the live DB (soft-delete leaks, injection, runaway scans). Constrain it: read-only connection/view, a curated/parameterized query surface or a validated query AST, soft-delete-filtered views, row/time caps. Treat as its own designed feature with its own tests.
- Must reckon with the two-model join caveat above (capital lives in the grid; recency from email links). - Must reckon with the two-model join caveat above (capital lives in the grid; recency from email links).
+9
View File
@@ -0,0 +1,9 @@
"""nl_query — the safe, read-only natural-language query surface (W2).
The LLM's job (added later) is only to map a question to a {intent, slots} pair; everything
that touches the database lives here behind a strict validator and a fixed, hand-written,
parameterized query catalog. See runner.py (the trust boundary) and intents.py (the catalog).
"""
from .runner import run_query, validate, catalog # noqa: F401
from .intents import INTENTS # noqa: F401
from .translate import translate, answer, build_system # noqa: F401
+433
View File
@@ -0,0 +1,433 @@
"""NL-query intents — the curated, hand-written query catalog (W2, the safe core).
Each intent is a FIXED, reviewed, parameterized SQL query with a small set of typed
"slots" (the blanks a question fills in: a number of days, a name, a limit). There is NO
generic SQL/AST compiler and NO dynamically-built identifiers: every table and column name
is hardcoded in the query text, and every value the caller (or an LLM) supplies reaches
SQLite only as a bound `?` parameter. That is the whole trust model — a malformed or
hostile request can change a bound value, never the query structure. Adding a capability
means adding a reviewed entry here, not widening a language.
Soft-delete discipline (CLAUDE.md standing rule), per table:
- reminders / opportunities / communications carry `deleted_at` -> filter `deleted_at IS NULL`.
- emails have NO `deleted_at`; "live" means a non-tombstoned per-mailbox sighting exists
(`email_account_messages.deleted_at IS NULL`) — mirror the digest / query_email_activity.
- fundraising_investors/_contacts/_funds/_commitments are a HARD-REBUILT projection of the
grid blob with NO `deleted_at` column; the live/retired axis there is the `graveyard` flag.
Do NOT add `deleted_at IS NULL` to those tables — the column does not exist and the clause
would raise. Exclude `graveyard = 1` where the question means "live" investors.
Each run_* returns {columns, rows, summary, truncated}. `summary` is a DETERMINISTIC local
one-liner (never an LLM narrative) — results never leave the box to be summarized.
"""
import sqlite3
from datetime import datetime, timedelta
# Generous ceiling — the Matrix review room is two admins and the web app is internal, so
# dumping the full book is acceptable (per Grant); this only guards against an unbounded
# scan flooding a response. A list intent past this is reported truncated, never silently cut.
MAX_ROWS = 500
# Live, non-terminal pipeline stages in funnel order (mirrors server.PIPELINE_STAGES; 'lost'
# is the terminal drop). Kept here so the pipeline intents have a stable rank without importing
# the server module (helpers take a conn; they never import server — house convention).
_STAGE_ORDER = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded']
_STAGE_RANK_SQL = (
"CASE stage WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3 "
"WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6 ELSE 0 END")
# ── helpers ────────────────────────────────────────────────────────────────────────────
def _rows(cur):
"""Materialize a cursor as a list of plain dicts, independent of the connection's
row_factory (works whether rows come back as tuples or sqlite3.Row)."""
cols = [c[0] for c in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
def like_contains(value):
"""Build a safe LIKE pattern for a free-text contains match. Escapes the LIKE
wildcards so a user/LLM value of '%' or '_' is treated literally — paired with
`LIKE ? ESCAPE '\\'` in the SQL, this stops '%' from matching the entire table."""
v = value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
return f"%{v}%"
def _last_activity_by_investor(conn):
"""{fundraising_investors.id: latest activity ISO ts} across logged communications and
captured grid-linked emails — the per-investor recency signal behind the "gone quiet"
and "last contact" intents.
NB: this MIRRORS server.last_activity_by_investor() and its soft-delete joins (comms via
cm.deleted_at IS NULL; email via a live email_account_messages sighting). It is duplicated
rather than imported only to keep this module free of a server import (the main module runs
as __main__, so `import server` would re-execute it). Keep the two in sync; the soft-delete
test guards this copy. If a third caller appears, extract both to a shared module."""
out = {}
def _bump(inv_id, ts):
if inv_id and ts and (out.get(inv_id) is None or str(ts) > str(out[inv_id])):
out[inv_id] = ts
# Each leg is guarded: the comms/email tables can be absent on a minimal DB. This is a
# narrow, intentional tolerance for optional tables — NOT the broad error-swallowing the
# runner forbids (a failure in an intent's main query surfaces as query_failed).
try:
for r in conn.execute(
"SELECT fc.investor_id AS inv, MAX(cm.communication_date) AS last_ts "
"FROM communications cm JOIN fundraising_contacts fc ON fc.contact_id = cm.contact_id "
"WHERE cm.deleted_at IS NULL AND fc.contact_id IS NOT NULL GROUP BY fc.investor_id"):
_bump(r["inv"], r["last_ts"])
except sqlite3.OperationalError:
pass
try:
for r in conn.execute(
"SELECT eil.fundraising_investor_id AS inv, MAX(e.sent_at) AS last_ts "
"FROM email_investor_links eil JOIN emails e ON e.id = eil.email_id "
"WHERE eil.fundraising_investor_id IS NOT NULL AND EXISTS "
"(SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id "
"AND eam.deleted_at IS NULL) GROUP BY eil.fundraising_investor_id"):
_bump(r["inv"], r["last_ts"])
except sqlite3.OperationalError:
pass
return out
def _today():
return datetime.utcnow().date()
def _days_since(ts):
"""Whole days between an ISO date/datetime string and today (UTC). None if unparseable."""
if not ts:
return None
try:
d = datetime.fromisoformat(str(ts)[:10].replace("Z", "")).date()
except ValueError:
return None
return (_today() - d).days
def _own_addresses(conn):
try:
return {(r[0] or "").lower().strip()
for r in conn.execute("SELECT email_address FROM email_accounts")} - {""}
except sqlite3.OperationalError:
return set()
def _truncate(rows):
"""Apply the global ceiling, returning (rows, truncated)."""
if len(rows) > MAX_ROWS:
return rows[:MAX_ROWS], True
return rows, False
# ── investor intents ─────────────────────────────────────────────────────────────────────
def run_investors_cold(conn, slots):
"""Live investors not contacted in `days` days — never-contacted first, then oldest."""
days = slots["days"]
cutoff = (_today() - timedelta(days=days)).isoformat()
last = _last_activity_by_investor(conn)
invs = _rows(conn.execute(
"SELECT id, investor_name, lead, total_invested FROM fundraising_investors "
"WHERE graveyard = 0 ORDER BY investor_name"))
cold = []
for inv in invs:
ts = last.get(inv["id"])
if ts is None or str(ts)[:10] < cutoff:
cold.append({"investor_name": inv["investor_name"], "lead": inv["lead"],
"total_invested": inv["total_invested"],
"last_activity_at": ts, "days_since": _days_since(ts)})
# never-contacted (days_since None) first, then most-stale first
cold.sort(key=lambda r: (r["days_since"] is not None, -(r["days_since"] or 0)))
rows, trunc = _truncate(cold)
return {"columns": ["investor_name", "lead", "total_invested", "last_activity_at", "days_since"],
"rows": rows, "truncated": trunc,
"summary": f"{len(cold)} live investor(s) not contacted in {days}+ days."}
def run_investor_lookup(conn, slots):
"""One investor's profile: contacts (name/email/title/city), committed total, per-fund
commitments, lead. Name matched as a contains (an LLM/user may pass a partial)."""
pat = like_contains(slots["name"])
invs = _rows(conn.execute(
"SELECT id, investor_name, lead, lead_source, total_invested, follow_up, graveyard "
"FROM fundraising_investors WHERE investor_name LIKE ? ESCAPE '\\' "
"ORDER BY graveyard, investor_name LIMIT 25", (pat,)))
for inv in invs:
inv["contacts"] = _rows(conn.execute(
"SELECT full_name, email, title, city, state, country FROM fundraising_contacts "
"WHERE investor_id = ? ORDER BY sort_order, full_name", (inv["id"],)))
inv["commitments"] = _rows(conn.execute(
"SELECT f.fund_name, c.amount FROM fundraising_commitments c "
"JOIN fundraising_funds f ON f.id = c.fund_id WHERE c.investor_id = ? AND c.amount <> 0 "
"ORDER BY f.display_order", (inv["id"],)))
inv.pop("id", None)
return {"columns": ["investor_name", "lead", "lead_source", "total_invested",
"follow_up", "graveyard", "contacts", "commitments"],
"rows": invs, "truncated": False,
"summary": f"{len(invs)} investor(s) matching \"{slots['name']}\"."}
def run_investors_by_city(conn, slots):
"""Investors with a contact located in `city` (contains match on the contact's city)."""
pat = like_contains(slots["city"])
rows = _rows(conn.execute(
"SELECT i.investor_name, c.full_name AS contact, c.city, c.state, c.country, i.lead "
"FROM fundraising_contacts c JOIN fundraising_investors i ON i.id = c.investor_id "
"WHERE i.graveyard = 0 AND c.city LIKE ? ESCAPE '\\' "
"ORDER BY i.investor_name, c.full_name LIMIT ?", (pat, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "contact", "city", "state", "country", "lead"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} investor contact(s) in \"{slots['city']}\"."}
def run_investors_by_lead(conn, slots):
"""Live investors owned by a given lead/team member (contains match on `lead`)."""
pat = like_contains(slots["lead"])
rows = _rows(conn.execute(
"SELECT investor_name, lead, total_invested, follow_up FROM fundraising_investors "
"WHERE graveyard = 0 AND lead LIKE ? ESCAPE '\\' "
"ORDER BY total_invested DESC, investor_name LIMIT ?", (pat, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "lead", "total_invested", "follow_up"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} live investor(s) led by \"{slots['lead']}\"."}
def run_top_investors_committed(conn, slots):
"""Top `limit` live investors by total committed capital across all funds."""
n = slots["limit"]
rows = _rows(conn.execute(
"SELECT investor_name, total_invested, lead FROM fundraising_investors "
"WHERE graveyard = 0 AND total_invested > 0 "
"ORDER BY total_invested DESC, investor_name LIMIT ?", (n,)))
return {"columns": ["investor_name", "total_invested", "lead"], "rows": rows,
"truncated": False, "summary": f"Top {len(rows)} investor(s) by committed capital."}
def run_investors_follow_up(conn, slots):
"""Investors we owe a follow-up to: those with an OPEN reminder, overdue first. Uses the
W1 reminders table (the richer follow-up layer) joined to the current grid name."""
today = _today().isoformat()
rows = _rows(conn.execute(
"SELECT COALESCE(i.investor_name, r.investor_name) AS investor_name, r.title, "
"r.due_date, r.status, r.assignee_id, "
"CASE WHEN r.due_date IS NOT NULL AND substr(r.due_date,1,10) < ? THEN 1 ELSE 0 END AS overdue "
"FROM reminders r LEFT JOIN fundraising_investors i ON i.id = r.investor_id "
"WHERE r.deleted_at IS NULL AND r.status = 'open' AND r.investor_id IS NOT NULL "
"ORDER BY (r.due_date IS NULL), r.due_date ASC LIMIT ?", (today, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "title", "due_date", "status", "overdue"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} investor(s) with an open follow-up reminder."}
# ── pipeline intents ──────────────────────────────────────────────────────────────────────
def run_pipeline_top(conn, slots):
"""Top `limit` live pipeline opportunities by stage (furthest along first), with the
investor, owner, and most-recent activity."""
n = slots["limit"]
last = _last_activity_by_investor(conn)
rows = _rows(conn.execute(
"SELECT o.fundraising_investor_id AS inv_id, "
"COALESCE(i.investor_name, o.name) AS investor_name, o.stage, o.expected_amount, "
"o.probability, u.full_name AS owner FROM opportunities o "
"LEFT JOIN fundraising_investors i ON i.id = o.fundraising_investor_id "
"LEFT JOIN users u ON u.id = o.owner_id "
"WHERE o.deleted_at IS NULL AND o.stage != 'lost' "
f"ORDER BY {_STAGE_RANK_SQL} DESC, o.expected_amount DESC LIMIT ?", (n,)))
for r in rows:
r["last_activity_at"] = last.get(r.pop("inv_id"))
return {"columns": ["investor_name", "stage", "expected_amount", "probability", "owner",
"last_activity_at"],
"rows": rows, "truncated": False,
"summary": f"Top {len(rows)} live pipeline opportunit(ies) by stage."}
def run_pipeline_totals(conn, slots):
"""Total pipeline dollars and the split across each stage (excludes lost)."""
rows = _rows(conn.execute(
"SELECT stage, COUNT(*) AS count, COALESCE(SUM(expected_amount),0) AS expected_total, "
"COALESCE(SUM(commitment_amount),0) AS committed_total FROM opportunities "
f"WHERE deleted_at IS NULL AND stage != 'lost' GROUP BY stage ORDER BY {_STAGE_RANK_SQL}"))
total = sum(r["expected_total"] for r in rows)
count = sum(r["count"] for r in rows)
return {"columns": ["stage", "count", "expected_total", "committed_total"],
"rows": rows, "truncated": False,
"summary": f"${total:,.0f} expected across {count} live opportunit(ies) in "
f"{len(rows)} stage(s)."}
# ── email / communication intents ─────────────────────────────────────────────────────────
def run_recent_emails(conn, slots):
"""The most recent `limit` matched investor emails, optionally one direction.
Matched-only + soft-delete-correct (a live email_account_messages sighting), mirroring
the Communications panel's query_email_activity."""
n, direction = slots["limit"], slots["direction"]
where = ["EXISTS (SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id "
"AND eam.deleted_at IS NULL)",
"EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"]
params = []
own = _own_addresses(conn)
if direction in ("inbound", "outbound") and own:
op = "IN" if direction == "outbound" else "NOT IN"
where.append(f"LOWER(e.from_email) {op} ({','.join('?' for _ in own)})")
params.extend(sorted(own))
sql = ("SELECT e.subject, e.from_name, e.from_email, e.sent_at, "
"(SELECT fi.investor_name FROM email_investor_links l "
" JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id "
" WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor "
"FROM emails e WHERE " + " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?")
rows = _rows(conn.execute(sql, params + [n]))
label = {"inbound": "received", "outbound": "sent"}.get(direction, "")
return {"columns": ["sent_at", "subject", "from_name", "from_email", "investor"],
"rows": rows, "truncated": False,
"summary": f"{len(rows)} most-recent {label + ' ' if label else ''}investor email(s)."}
def run_investor_last_contact(conn, slots):
"""When we last had any activity with investor X (matched by name)."""
pat = like_contains(slots["name"])
last = _last_activity_by_investor(conn)
invs = _rows(conn.execute(
"SELECT id, investor_name FROM fundraising_investors "
"WHERE investor_name LIKE ? ESCAPE '\\' ORDER BY graveyard, investor_name LIMIT 25", (pat,)))
rows = []
for inv in invs:
ts = last.get(inv["id"])
rows.append({"investor_name": inv["investor_name"], "last_activity_at": ts,
"days_since": _days_since(ts)})
return {"columns": ["investor_name", "last_activity_at", "days_since"], "rows": rows,
"truncated": False, "summary": f"Last contact for {len(rows)} investor(s) "
f"matching \"{slots['name']}\"."}
def run_comms_by_user(conn, slots):
"""The most recent `limit` outbound investor emails sent by a given user (matched by
username or full name). Soft-delete-correct (live sighting, is_sent)."""
n, pat = slots["limit"], like_contains(slots["user"])
rows = _rows(conn.execute(
"SELECT e.subject, e.sent_at, u.full_name AS sender, "
"(SELECT fi.investor_name FROM email_investor_links l "
" JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id "
" WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor "
"FROM emails e JOIN email_account_messages eam ON eam.email_id = e.id "
"AND eam.deleted_at IS NULL AND eam.is_sent = 1 "
"JOIN email_accounts ea ON ea.id = eam.account_id JOIN users u ON u.id = ea.user_id "
"WHERE (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\') "
"ORDER BY e.sent_at DESC LIMIT ?", (pat, pat, n)))
return {"columns": ["sent_at", "subject", "sender", "investor"], "rows": rows,
"truncated": False,
"summary": f"{len(rows)} recent email(s) sent by \"{slots['user']}\"."}
def run_email_counts_by_user(conn, slots):
"""Per-user counts of outbound investor emails over this week / month / year-to-date.
Windows are calendar-based: week = since Monday, month = since the 1st, ytd = since Jan 1."""
today = _today()
wk = (today - timedelta(days=today.weekday())).isoformat()
mo = today.replace(day=1).isoformat()
yr = today.replace(month=1, day=1).isoformat()
where = "WHERE eam.deleted_at IS NULL AND eam.is_sent = 1"
params = [wk, mo, yr]
if slots.get("user"):
pat = like_contains(slots["user"])
where += " AND (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\')"
params.extend([pat, pat])
rows = _rows(conn.execute(
"SELECT u.full_name AS user, u.username, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_week, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_month, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS ytd "
"FROM users u JOIN email_accounts ea ON ea.user_id = u.id "
"JOIN email_account_messages eam ON eam.account_id = ea.id "
"JOIN emails e ON e.id = eam.email_id " + where +
" GROUP BY u.id HAVING ytd > 0 ORDER BY ytd DESC", params))
return {"columns": ["user", "this_week", "this_month", "ytd"], "rows": rows,
"truncated": False, "summary": f"Outbound email counts for {len(rows)} user(s)."}
# ── registry ──────────────────────────────────────────────────────────────────────────────
# key -> {summary, slots, run, example}. `slots` is consumed by the runner's validator and
# (later) surfaced to the local-model translator + the UI as the single source of truth for
# what is queryable. SlotSpec: {type: int|enum|text, ...constraints}.
INTENTS = {
"investors_cold": {
"summary": "Investors we haven't contacted in a while (default 90 days).",
"slots": {"days": {"type": "int", "default": 90, "min": 1, "max": 3650}},
"example": "Which investors haven't we reached out to in the last 3 months?",
"run": run_investors_cold,
},
"investor_lookup": {
"summary": "One investor's contacts, email, committed total and per-fund breakdown.",
"slots": {"name": {"type": "text", "required": True, "maxlen": 120}},
"example": "What is Acme Capital's email and how much have they committed across funds?",
"run": run_investor_lookup,
},
"investors_by_city": {
"summary": "Investors with a contact located in a given city.",
"slots": {"city": {"type": "text", "required": True, "maxlen": 80}},
"example": "Who are all the investors located in Austin?",
"run": run_investors_by_city,
},
"investors_by_lead": {
"summary": "Investors owned by a given lead / team member.",
"slots": {"lead": {"type": "text", "required": True, "maxlen": 80}},
"example": "Show me the investors led by Jonathan.",
"run": run_investors_by_lead,
},
"top_investors_committed": {
"summary": "Top investors by total committed capital.",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}},
"example": "List our top 10 investors by committed capital.",
"run": run_top_investors_committed,
},
"investors_follow_up": {
"summary": "Investors we owe a follow-up to (have an open reminder), overdue first.",
"slots": {},
"example": "Which investors do we owe follow-ups to?",
"run": run_investors_follow_up,
},
"pipeline_top": {
"summary": "Top pipeline opportunities by stage, with investor, owner and last activity.",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}},
"example": "List our top 10 pipeline investors by stage and last conversation.",
"run": run_pipeline_top,
},
"pipeline_totals": {
"summary": "Total pipeline dollars and the split across each stage.",
"slots": {},
"example": "What is our total pipeline in dollars, split by stage?",
"run": run_pipeline_totals,
},
"recent_emails": {
"summary": "The most recent investor emails (optionally inbound or outbound only).",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": 100},
"direction": {"type": "enum", "choices": ["any", "inbound", "outbound"],
"default": "any"}},
"example": "What were the last 10 investor emails we sent, and who to?",
"run": run_recent_emails,
},
"investor_last_contact": {
"summary": "When we last had any activity with a given investor.",
"slots": {"name": {"type": "text", "required": True, "maxlen": 120}},
"example": "When did we last reach out to Acme Capital?",
"run": run_investor_last_contact,
},
"comms_by_user": {
"summary": "Recent investor emails sent by a given team member.",
"slots": {"user": {"type": "text", "required": True, "maxlen": 80},
"limit": {"type": "int", "default": 10, "min": 1, "max": 100}},
"example": "What were the last investor emails sent by Grant?",
"run": run_comms_by_user,
},
"email_counts_by_user": {
"summary": "How many investor emails each user sent this week / month / year-to-date.",
"slots": {"user": {"type": "text", "required": False, "maxlen": 80}},
"example": "How many emails has Grant sent this week, this month, and year to date?",
"run": run_email_counts_by_user,
},
}
+127
View File
@@ -0,0 +1,127 @@
"""NL-query runner — validate a {intent, slots} request, run the curated query, return rows.
This is the trust boundary. Whatever produced the request (a local model in W2, the web UI,
or a test) is untrusted: the runner accepts ONLY a known intent key and slot VALUES, coerces
each value to its declared type, and rejects anything off-spec — it never lets a caller name
a table/column, write SQL, or choose an operator. The intents do the rest with fixed,
parameterized SQL (see intents.py). All failure modes return a structured error dict; the
runner never raises to the caller (a bad `limit=abc` must not crash the request thread).
"""
import sqlite3
from .intents import INTENTS
def _coerce_slot(name, spec, raw):
"""Coerce/validate one slot value against its spec. Returns (value, error). Exactly one
of the two is meaningful: error is None on success, else a human-readable string."""
t = spec["type"]
provided = raw is not None and not (isinstance(raw, str) and raw.strip() == "")
if not provided:
if "default" in spec:
return spec["default"], None
if spec.get("required"):
return None, f"slot '{name}' is required"
return None, None # optional, absent
if t == "int":
try:
v = int(raw)
except (TypeError, ValueError):
return None, f"slot '{name}' must be an integer (got {raw!r})"
if "min" in spec:
v = max(spec["min"], v)
if "max" in spec:
v = min(spec["max"], v)
return v, None
if t == "enum":
v = str(raw).strip().lower()
if v not in spec["choices"]:
if "default" in spec:
return spec["default"], None
return None, f"slot '{name}' must be one of {spec['choices']} (got {raw!r})"
return v, None
if t == "text":
v = str(raw).strip()
maxlen = spec.get("maxlen", 200)
if len(v) > maxlen:
v = v[:maxlen]
return v, None
return None, f"slot '{name}' has unknown type {t!r}" # registry bug, fail visibly
def validate(intent_key, raw_slots):
"""Validate an intent + raw slots WITHOUT running. Returns (clean_slots, error_dict).
Useful to the translator/UI for a dry-run check. error_dict is None on success."""
if intent_key not in INTENTS:
return None, {"error": "unknown_intent", "intent": intent_key,
"detail": f"unknown intent; known: {sorted(INTENTS)}"}
spec = INTENTS[intent_key]["slots"]
raw_slots = raw_slots or {}
# Reject unexpected slot keys rather than ignore them — a request shaped wrong is a
# misunderstanding worth surfacing, not silently dropping.
unexpected = [k for k in raw_slots if k not in spec]
if unexpected:
return None, {"error": "bad_slot", "intent": intent_key,
"detail": f"unexpected slot(s): {unexpected}; allowed: {sorted(spec)}"}
clean = {}
for name, sspec in spec.items():
v, err = _coerce_slot(name, sspec, raw_slots.get(name))
if err:
return None, {"error": "bad_slot", "intent": intent_key, "detail": err}
if v is not None or "default" in sspec:
clean[name] = v
return clean, None
def run_query(conn, intent_key, raw_slots=None, *, audit_fn=None, actor=None, source="api"):
"""Validate and execute a curated NL query. Always returns a dict — either a result
{intent, slots, columns, rows, row_count, truncated, summary} or an error
{error, intent, detail}. Records an audit row via audit_fn (if given) so a query made
through a leaked/automated credential is detectable.
audit_fn signature: audit_fn({actor, source, intent, slots, row_count, error}).
"""
clean, err = validate(intent_key, raw_slots)
if err:
if audit_fn:
try:
audit_fn({"actor": actor, "source": source, "intent": intent_key,
"slots": raw_slots, "row_count": 0, "error": err["error"]})
except Exception:
pass
return err
try:
result = INTENTS[intent_key]["run"](conn, clean)
except sqlite3.Error as exc:
# Surface a query failure (e.g. a missing optional table) as a visible error — never
# swallow it and hand back an empty result that reads as an authoritative "none".
if audit_fn:
try:
audit_fn({"actor": actor, "source": source, "intent": intent_key,
"slots": clean, "row_count": 0, "error": "query_failed"})
except Exception:
pass
return {"error": "query_failed", "intent": intent_key, "detail": str(exc)}
out = {"intent": intent_key, "slots": clean, "row_count": len(result.get("rows", [])),
**result}
if audit_fn:
try:
audit_fn({"actor": actor, "source": source, "intent": intent_key,
"slots": clean, "row_count": out["row_count"], "error": None})
except Exception:
pass
return out
def catalog():
"""The queryable surface as data: every intent's key, summary, slot specs and example.
Single source of truth for the W2 translator prompt and any UI hint list."""
return [{"intent": k, "summary": v["summary"], "slots": v["slots"],
"example": v.get("example", "")} for k, v in INTENTS.items()]
+226
View File
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""Tests for the W2 safe NL-query runner (the model-free core).
Boots the REAL schema (server.init_db against a temp DB — exact columns + all migrations),
inserts synthetic fundraising/email/reminder/pipeline data, and exercises every intent plus
the trust-boundary behaviour:
- each intent returns the right rows over the real schema;
- SOFT-DELETE is respected on both recency legs (a tombstoned communication and a tombstoned
email sighting never count), on reminders, and on opportunities; graveyard investors are
excluded from "live" intents;
- the validator rejects bad/unknown/unexpected slots WITHOUT crashing (the `?limit=abc` class);
- LIKE wildcards in a free-text slot are escaped (a city of "%" does NOT return everything);
- limits clamp to their caps; the audit hook fires with the intent + row count.
Synthetic data only — no real LP substance, no network, no model.
Run: cd backend && python3 nl_query/test_nl_query.py
"""
import os
import sys
import tempfile
from datetime import datetime, timedelta
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1"
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
import server # noqa: E402
import nl_query # noqa: E402
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def _ago(days):
return (datetime.utcnow() - timedelta(days=days)).isoformat() + "Z"
TODAY = datetime.utcnow().date()
def seed(conn):
c = conn.execute
# users + mailboxes
c("INSERT INTO users (id, username, email, password_hash, full_name, role) VALUES "
"('u_grant','grant','grant@ten31.xyz','x','Grant Smith','admin'),"
"('u_jon','jonathan','jon@ten31.xyz','x','Jonathan Lee','member')")
c("INSERT INTO email_accounts (id, user_id, email_address, auth_method) VALUES "
"('a_grant','u_grant','grant@ten31.xyz','dwd'),"
"('a_jon','u_jon','jon@ten31.xyz','dwd')")
# funds
c("INSERT INTO fundraising_funds (id, column_id, fund_name, display_order) VALUES "
"('f1','c_f1','Fund I',1),('f2','c_f2','Fund II',2)")
# investors (graveyard flag is the live/retired axis; no deleted_at on this table)
def inv(iid, name, lead, total, grave=0):
c("INSERT INTO fundraising_investors (id, investor_name, lead, graveyard, "
"source_row_id, total_invested) VALUES (?,?,?,?,?,?)",
(iid, name, lead, grave, iid, total))
inv("i_acme", "Acme Capital", "Jonathan Lee", 5_000_000)
inv("i_beta", "Beta Partners", "Grant Smith", 2_000_000)
inv("i_cold", "Cold Co", "Grant Smith", 0) # never contacted
inv("i_delta", "Delta LP", "Grant Smith", 1_000_000) # only a (comms) signal
inv("i_ghost", "Graveyard Ghost", "Grant Smith", 9_999_999, grave=1)
# contacts (grid pills) + classic contact rows for the comms leg
c("INSERT INTO fundraising_contacts (id, investor_id, full_name, email, title, city, "
"contact_id, sort_order) VALUES "
"('fc_a','i_acme','Alice Acme','alice@acme.com','GP','Austin','cc_alice',0),"
"('fc_b','i_beta','Bob Beta','bob@beta.com','LP','Denver',NULL,0),"
"('fc_d','i_delta','Dana Delta','dana@delta.com','CFO','Miami','cc_dana',0)")
c("INSERT INTO contacts (id, first_name, last_name, email) VALUES "
"('cc_alice','Alice','Acme','alice@acme.com'),"
"('cc_dana','Dana','Delta','dana@delta.com')")
# commitments — Acme across two funds (3M + 2M = 5M); Beta one fund
c("INSERT INTO fundraising_commitments (id, investor_id, fund_id, amount) VALUES "
"('cm1','i_acme','f1',3_000_000),('cm2','i_acme','f2',2_000_000),"
"('cm3','i_beta','f1',2_000_000)")
# emails: matched + a per-mailbox sighting. is_sent + from_email decide direction.
def email(eid, frm, frm_name, days, inv_id, account, is_sent, deleted=False):
c("INSERT INTO emails (id, rfc_message_id, from_email, from_name, sent_at, subject, "
"is_matched, match_status) VALUES (?,?,?,?,?,?,1,'matched')",
(eid, "rfc_" + eid, frm, frm_name, _ago(days), "Re: " + eid))
c("INSERT INTO email_account_messages (id, email_id, account_id, gmail_message_id, "
"gmail_thread_id, is_sent, deleted_at) VALUES (?,?,?,?,?,?,?)",
("eam_" + eid, eid, account, "g_" + eid, "t_" + eid, is_sent,
_ago(days) if deleted else None))
c("INSERT INTO email_investor_links (id, email_id, fundraising_investor_id, "
"matched_address, match_kind) VALUES (?,?,?,?, 'exact_email')",
("eil_" + eid, eid, inv_id, frm))
email("ea_recent", "grant@ten31.xyz", "Grant Smith", 0, "i_acme", "a_grant", 1) # Acme: today
email("eb_old", "grant@ten31.xyz", "Grant Smith", 40, "i_beta", "a_grant", 1) # Beta: 40d
email("edel", "grant@ten31.xyz", "Grant Smith", 0, "i_beta", "a_grant", 1, deleted=True) # tombstoned
email("ej", "jon@ten31.xyz", "Jonathan Lee", 0, "i_acme", "a_jon", 1) # jonathan today
email("ein", "alice@acme.com", "Alice Acme", 3, "i_acme", "a_grant", 0) # inbound 3d
# communications (the other recency leg) — Delta has ONLY comms: one live (5d), one tombstoned
# (today). If the soft-delete filter broke, Delta would read as contacted today.
c("INSERT INTO communications (id, contact_id, type, communication_date, created_by) VALUES "
"('cmm_live','cc_dana','email',?,'u_grant')", (_ago(5),))
c("INSERT INTO communications (id, contact_id, type, communication_date, created_by, deleted_at) "
"VALUES ('cmm_del','cc_dana','email',?,'u_grant',?)", (_ago(0), _ago(0)))
# reminders — open(overdue) / open(future) / done / deleted / standalone
def rem(rid, inv_id, title, due, status="open", deleted=False):
c("INSERT INTO reminders (id, investor_id, investor_name, title, due_date, status, "
"deleted_at) VALUES (?,?,?,?,?,?,?)",
(rid, inv_id, title, title, due, status, _ago(0) if deleted else None))
rem("r_over", "i_beta", "Send deck", (TODAY - timedelta(days=1)).isoformat()) # overdue
rem("r_future", "i_acme", "Quarterly check-in", (TODAY + timedelta(days=10)).isoformat())
rem("r_done", "i_acme", "Old task", (TODAY - timedelta(days=2)).isoformat(), status="done")
rem("r_del", "i_acme", "Tombstoned", (TODAY - timedelta(days=2)).isoformat(), deleted=True)
rem("r_standalone", None, "Team chore", (TODAY - timedelta(days=1)).isoformat())
# opportunities — committed / meeting (live) / lost (terminal) / deleted
def opp(oid, inv_id, contact, stage, expected, owner, deleted=False):
c("INSERT INTO opportunities (id, name, contact_id, stage, expected_amount, owner_id, "
"fundraising_investor_id, deleted_at) VALUES (?,?,?,?,?,?,?,?)",
(oid, oid, contact, stage, expected, owner, inv_id, _ago(0) if deleted else None))
# opp contact_id must reference a real contacts row (FK on); reuse the two we made
opp("o_acme", "i_acme", "cc_alice", "committed", 4_000_000, "u_jon")
opp("o_beta", "i_beta", "cc_dana", "meeting", 1_000_000, "u_grant")
opp("o_lost", "i_acme", "cc_alice", "lost", 9_000_000, "u_jon")
opp("o_del", "i_beta", "cc_dana", "due_diligence", 7_000_000, "u_grant", deleted=True)
conn.commit()
def names(res):
return [r["investor_name"] for r in res["rows"]]
def main():
server.init_db()
conn = server.get_db()
seed(conn)
run = lambda *a, **k: nl_query.run_query(conn, *a, **k)
print("investors_cold")
r = run("investors_cold", {"days": 30})
check(names(r) == ["Cold Co", "Beta Partners"], f"cold(30) never-first then stale: {names(r)}")
check(run("investors_cold", {"days": 90})["row_count"] == 1, "cold(90): only never-contacted")
check("Graveyard Ghost" not in names(run("investors_cold", {"days": 3650})),
"cold excludes graveyard investors")
check("Delta LP" in names(run("investors_cold", {"days": 3})), "cold(3) sees Delta (comms 5d)")
check("Delta LP" not in names(run("investors_cold", {"days": 7})),
"cold(7): Delta's tombstoned comm (today) did NOT count")
print("investor_lookup")
r = run("investor_lookup", {"name": "acme"})
check(r["row_count"] == 1 and r["rows"][0]["total_invested"] == 5_000_000, "lookup total committed")
check({c["fund_name"] for c in r["rows"][0]["commitments"]} == {"Fund I", "Fund II"},
"lookup per-fund breakdown")
check(r["rows"][0]["contacts"][0]["email"] == "alice@acme.com", "lookup surfaces contact email")
print("investors_by_city / by_lead / top / follow_up")
check(names(run("investors_by_city", {"city": "Austin"})) == ["Acme Capital"], "by_city")
check(set(names(run("investors_by_lead", {"lead": "Grant"}))) == {"Beta Partners", "Cold Co", "Delta LP"},
"by_lead excludes graveyard + other leads")
check(names(run("top_investors_committed", {"limit": 2})) == ["Acme Capital", "Beta Partners"],
"top by committed (graveyard + zero excluded)")
r = run("investors_follow_up")
check(names(r) == ["Beta Partners", "Acme Capital"], f"follow_up overdue-first, open-only: {names(r)}")
check(r["rows"][0]["overdue"] == 1 and r["rows"][1]["overdue"] == 0, "follow_up overdue flag")
print("pipeline")
r = run("pipeline_totals")
stages = {row["stage"]: row for row in r["rows"]}
check(set(stages) == {"committed", "meeting"}, f"pipeline_totals excludes lost+deleted: {set(stages)}")
check(stages["committed"]["expected_total"] == 4_000_000, "pipeline_totals stage sum")
r = run("pipeline_top", {"limit": 10})
check(names(r) == ["Acme Capital", "Beta Partners"], "pipeline_top furthest-stage first")
check(r["rows"][0]["last_activity_at"] is not None, "pipeline_top enriches last activity")
print("emails")
check(run("recent_emails", {"direction": "outbound"})["row_count"] == 3,
"recent_emails(outbound): 3 live (tombstoned sighting excluded)")
check(run("recent_emails", {"direction": "inbound"})["row_count"] == 1, "recent_emails(inbound)")
check(run("recent_emails")["row_count"] == 4, "recent_emails(any): 4 live")
r = run("investor_last_contact", {"name": "beta"})
check(r["rows"][0]["days_since"] >= 39, "investor_last_contact days_since")
check(run("comms_by_user", {"user": "Grant"})["row_count"] == 2,
"comms_by_user: grant's 2 live outbound (tombstoned excluded)")
r = run("email_counts_by_user", {"user": "grant"})
check(r["rows"][0]["this_week"] == 1, "email_counts this_week = 1 live (tombstoned excluded)")
check(r["rows"][0]["ytd"] >= 1, "email_counts ytd")
print("trust boundary")
check(run("investors_cold", {"days": "abc"})["error"] == "bad_slot", "bad int slot -> bad_slot, no crash")
check(run("nope")["error"] == "unknown_intent", "unknown intent rejected")
check(run("pipeline_totals", {"foo": 1})["error"] == "bad_slot", "unexpected slot rejected")
check(run("investor_lookup", {})["error"] == "bad_slot", "missing required slot rejected")
check(run("investors_by_city", {"city": "%"})["row_count"] == 0,
"LIKE wildcard escaped — '%' does not match every row")
check(run("investors_cold", {"days": 0})["slots"]["days"] == 1, "int slot clamps to min")
check(run("top_investors_committed", {"limit": 99999})["slots"]["limit"] == nl_query.INTENTS
["top_investors_committed"]["slots"]["limit"]["max"], "int slot clamps to max")
print("audit hook + catalog")
seen = []
run("pipeline_totals", audit_fn=seen.append, actor="tester", source="test")
check(len(seen) == 1 and seen[0]["intent"] == "pipeline_totals" and seen[0]["error"] is None
and seen[0]["actor"] == "tester", "audit hook fires with intent/actor/no-error")
run("nope", audit_fn=seen.append)
check(seen[-1]["error"] == "unknown_intent", "audit hook fires on rejection too")
check(len(nl_query.catalog()) == len(nl_query.INTENTS), "catalog covers every intent")
conn.close()
print()
if FAILS:
print(f"{len(FAILS)} FAILED")
for f in FAILS:
print(" - " + f)
sys.exit(1)
print("ALL PASS")
if __name__ == "__main__":
main()
+139
View File
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""Endpoint tests for the W2 NL-query HTTP surface (POST /api/query/nl, GET /api/query/catalog).
Boots the REAL server against a temp DB and exercises the wiring end-to-end: auth gating
(bot/admin only), the direct {intent, slots} mode, the soft-error shape, and the status
mapping. The local model is forced UNAVAILABLE by pointing SPARK_CONTROL_URL at a dead local
port, so the {question} path exercises the 503 path deterministically without any Spark.
Synthetic data only.
Run: cd backend && python3 nl_query/test_nl_query_endpoint.py
"""
import http.client
import json
import os
import sqlite3
import sys
import tempfile
import threading
from http.server import ThreadingHTTPServer
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1"
# Dead port -> the local-model leg fails fast, so the {question} path returns 503 deterministically
# (set before server/config import; load_env uses setdefault so this wins over any repo .env).
os.environ["SPARK_CONTROL_URL"] = "http://127.0.0.1:1"
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
import server # noqa: E402
import nl_query # noqa: E402
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
class _Quiet(server.CRMHandler):
def log_message(self, *a):
pass
def _req(port, method, path, token=None, body=None):
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10)
headers = {}
if token:
headers["Authorization"] = "Bearer " + token
payload = json.dumps(body) if body is not None else None
if payload is not None:
headers["Content-Type"] = "application/json"
conn.request(method, path, body=payload, headers=headers)
resp = conn.getresponse()
raw = resp.read().decode("utf-8", "replace")
conn.close()
data = json.loads(raw) if raw else None
return resp.status, data
def _data(d):
return (d or {}).get("data") or {}
def main():
server.init_db()
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
db.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) VALUES "
"('u_admin','grant','g@t.x','x','Grant','admin',1),"
"('u_mem','mem','m@t.x','x','Mem','member',1)")
db.execute("INSERT INTO fundraising_investors (id,investor_name,lead,graveyard,source_row_id,"
"total_invested) VALUES ('a','Acme Capital','Jon',0,'a',5000000),"
"('b','Beta Partners','Grant',0,'b',2000000),('g','Ghost','Grant',1,'g',9000000)")
db.commit()
db.close()
admin = server.create_token("u_admin", "grant", "admin")
member = server.create_token("u_mem", "mem", "member")
httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet)
port = httpd.server_address[1]
threading.Thread(target=httpd.serve_forever, daemon=True).start()
try:
print("direct {intent, slots} mode")
st, d = _req(port, "POST", "/api/query/nl", admin,
{"intent": "top_investors_committed", "slots": {"limit": 2}})
rows = _data(d).get("rows", [])
check(st == 200 and [r["investor_name"] for r in rows] == ["Acme Capital", "Beta Partners"],
f"admin direct query -> 200 + rows (got {st})")
check(_data(d).get("intent") == "top_investors_committed", "response echoes interpreted intent")
print("soft errors + validation")
st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "made_up"})
check(st == 200 and _data(d).get("error") == "unknown_intent",
f"bad intent -> 200 with data.error=unknown_intent (got {st}, {_data(d).get('error')})")
st, d = _req(port, "POST", "/api/query/nl", admin, {})
check(st == 400, f"neither question nor intent -> 400 (got {st})")
print("auth gating")
st, _ = _req(port, "POST", "/api/query/nl", member,
{"intent": "top_investors_committed"})
check(st == 403, f"member -> 403 (got {st})")
st, _ = _req(port, "POST", "/api/query/nl", None, {"intent": "top_investors_committed"})
check(st == 401, f"unauthenticated -> 401 (got {st})")
print("catalog")
st, d = _req(port, "GET", "/api/query/catalog", admin)
check(st == 200 and isinstance(d.get("data"), list) and len(d["data"]) == len(nl_query.INTENTS),
f"catalog -> 200 with every intent (got {st})")
st, _ = _req(port, "GET", "/api/query/catalog", member)
check(st == 403, f"catalog member -> 403 (got {st})")
print("question path with the local model down")
st, d = _req(port, "POST", "/api/query/nl", admin,
{"question": "who are our top investors by committed capital?"})
check(st == 503 and _data(d).get("error") == "model_unavailable",
f"question + dead model -> 503 model_unavailable (got {st}, {_data(d).get('error')})")
check(_data(d).get("question"), "question echoed back even on outage")
print("audit trail")
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
n = db.execute("SELECT COUNT(*) FROM audit_log WHERE entity_type='nl_query'").fetchone()[0]
db.close()
check(n >= 2, f"executed queries are audited (entity_type=nl_query rows: {n})")
finally:
httpd.shutdown()
print()
if FAILS:
print(f"{len(FAILS)} FAILED")
for f in FAILS:
print(" - " + f)
sys.exit(1)
print("ALL PASS")
if __name__ == "__main__":
main()
+107
View File
@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""Tests for the W2 NL translator (question -> {intent, slots}) — the local-model leg.
The model is stubbed via an injected chat_fn, so this runs fully offline (no Spark, no
network). Covers:
- build_system() exposes the whole intent catalog as the model's closed vocabulary;
- translate() returns the parsed {intent, slots} and DROPS slot keys the intent doesn't
declare (model noise), while every surviving value is still validated downstream;
- the translation failure modes: no intent fit -> no_match; unparseable -> no_match;
local model unreachable -> model_unavailable (so the endpoint can 503);
- answer() chains translate + the validated runner end-to-end, and a HALLUCINATED intent
from the model is still rejected by the validator (the model output is never trusted).
Run: cd backend && python3 nl_query/test_translate.py
"""
import os
import sys
import tempfile
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
import server # noqa: E402
import nl_query # noqa: E402
T = nl_query # exercise the public API (translate/answer/build_system are re-exported)
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def main():
print("build_system")
sysprompt = nl_query.build_system()
check(all(k in sysprompt for k in nl_query.INTENTS), "system prompt lists every intent key")
check("days (integer, default 90)" in sysprompt, "system prompt renders int slot + default")
check("one of any|inbound|outbound" in sysprompt, "system prompt renders enum choices")
print("translate")
captured = {}
def fake(prompt, system):
captured["system"] = system
captured["prompt"] = prompt
return {"intent": "investors_cold", "slots": {"days": 90, "bogus": "x"}}
r = T.translate("who's gone quiet for 3 months?", chat_fn=fake)
check(r == {"intent": "investors_cold", "slots": {"days": 90}},
f"routes to intent + drops unknown slot 'bogus': {r}")
check(nl_query.INTENTS and "investors_cold" in captured["system"], "chat_fn received the catalog")
check(captured["prompt"] == "who's gone quiet for 3 months?", "chat_fn received the question")
check(T.translate("x", chat_fn=lambda q, s: {"intent": None})["error"] == "no_match",
"intent null -> no_match")
check(T.translate("x", chat_fn=lambda q, s: None)["error"] == "no_match",
"unparseable model reply -> no_match")
check(T.translate("", chat_fn=lambda q, s: {"intent": "x"})["error"] == "no_match",
"empty question -> no_match (no model call needed)")
def boom(q, s):
raise RuntimeError("spark down")
check(T.translate("x", chat_fn=boom)["error"] == "model_unavailable",
"local model unreachable -> model_unavailable")
print("answer (end-to-end through the validated runner)")
server.init_db()
conn = server.get_db()
conn.execute("INSERT INTO fundraising_investors (id, investor_name, lead, graveyard, "
"source_row_id, total_invested) VALUES "
"('a','Acme Capital','Jon',0,'a',5000000),"
"('b','Beta Partners','Grant',0,'b',2000000),"
"('g','Ghost','Grant',1,'g',9000000)")
conn.commit()
r = T.answer(conn, "top investors",
chat_fn=lambda q, s: {"intent": "top_investors_committed", "slots": {"limit": 2}})
check([x["investor_name"] for x in r["rows"]] == ["Acme Capital", "Beta Partners"],
"answer() runs the translated query")
check(r["question"] == "top investors", "answer() echoes the original question")
r = T.answer(conn, "nonsense", chat_fn=lambda q, s: {"intent": "made_up_intent", "slots": {}})
check(r.get("error") == "unknown_intent", "hallucinated intent is rejected by the validator")
check(r["question"] == "nonsense", "answer() echoes question on error too")
r = T.answer(conn, "anything", chat_fn=boom)
check(r.get("error") == "model_unavailable", "answer() surfaces a model outage")
conn.close()
print()
if FAILS:
print(f"{len(FAILS)} FAILED")
for f in FAILS:
print(" - " + f)
sys.exit(1)
print("ALL PASS")
if __name__ == "__main__":
main()
+108
View File
@@ -0,0 +1,108 @@
"""NL-query translator — plain-English question -> {intent, slots} on the LOCAL model.
The model's ONLY job is to pick one curated intent and fill its typed slots; it never
touches the database, never sees a row, and never writes SQL. Its output is untrusted and
is handed straight to the runner's validator (runner.validate), which is the trust boundary.
LOCAL-ONLY BY CONSTRUCTION. Translation runs on the local Qwen via Spark Control
(SPARK_CONTROL_URL), the same sanctioned local leg as intake/digest — so the question never
leaves the box and there is NO Claude path and NO redaction boundary to manage here (that
was the whole point of the W2 simplification: the answer is sensitive and never leaves; the
question is generic English and is translated locally). If the local model ever proves too
weak, a Claude-behind-redaction translator could be slotted in as an alternative `chat_fn`
WITHOUT changing the validator/executor — but it is deliberately not built.
`chat_fn(prompt, system) -> dict|None` is injectable so the whole translation leg is testable
offline without Spark. The default calls the ingest Spark client (lazy import — it ships in
the Docker image, not the bare CRM).
"""
from .intents import INTENTS
from .runner import run_query
def _default_chat_json(prompt, system):
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest"))
import llm # noqa: E402 (ingest Spark client; raises if Spark is unreachable)
return llm.chat_json(prompt, system=system, max_tokens=400)
def _render_slot(name, spec):
t = spec["type"]
if t == "int":
extra = f", default {spec['default']}" if "default" in spec else ""
return f"{name} (integer{extra})"
if t == "enum":
extra = f", default {spec['default']}" if "default" in spec else ""
return f"{name} (one of {'|'.join(spec['choices'])}{extra})"
req = ", required" if spec.get("required") else ", optional"
return f"{name} (text{req})"
def build_system():
"""The system prompt: the full intent catalog as the model's closed vocabulary."""
lines = [
"You translate a question about a venture fund's investor database into ONE "
"structured query. Respond with ONLY a JSON object and nothing else:",
' {"intent": "<one key below, or null>", "slots": {<slot>: <value>}}',
"",
"Rules:",
"- Choose the single best-fitting intent. If none fits, return {\"intent\": null}.",
"- Use ONLY the slot names listed for the chosen intent; omit a slot to accept its default.",
"- Convert natural durations to the integer a slot wants: '3 months'->90, 'a quarter'->90, "
"'6 weeks'->42, 'a year'/'year to date'->365.",
"- Copy names, cities and people verbatim from the question into text slots.",
"- No commentary, no markdown, JSON only.",
"",
"Intents:",
]
for key, spec in INTENTS.items():
slots = spec["slots"]
slot_str = "; ".join(_render_slot(n, s) for n, s in slots.items()) or "(none)"
lines.append(f"- {key}: {spec['summary']}")
lines.append(f" slots: {slot_str}")
if spec.get("example"):
lines.append(f" e.g. \"{spec['example']}\"")
return "\n".join(lines)
def translate(question, *, chat_fn=None):
"""Map a question to {intent, slots} on the local model. Returns that dict, or an error
dict {error, detail}: 'model_unavailable' (local model unreachable -> the endpoint 503s)
or 'no_match' (the model could not map the question to any intent)."""
chat_fn = chat_fn or _default_chat_json
q = (question or "").strip()
if not q:
return {"error": "no_match", "detail": "empty question"}
try:
data = chat_fn(q, build_system())
except Exception as exc: # connection/runtime failure on the LOCAL model
return {"error": "model_unavailable", "detail": str(exc)}
if not isinstance(data, dict):
return {"error": "no_match", "detail": "model returned no parseable JSON"}
intent = data.get("intent")
if intent in (None, "", "null", "none"):
return {"error": "no_match", "detail": "no intent fit the question"}
slots = data.get("slots")
slots = slots if isinstance(slots, dict) else {}
# Drop slot KEYS the chosen intent doesn't declare — model noise, not a safety concern
# (every surviving VALUE still goes through full type validation in the runner). Unknown
# intents are left as-is so the runner rejects them as unknown_intent.
if intent in INTENTS:
allowed = INTENTS[intent]["slots"]
slots = {k: v for k, v in slots.items() if k in allowed}
return {"intent": intent, "slots": slots}
def answer(conn, question, *, chat_fn=None, audit_fn=None, actor=None, source="api"):
"""End-to-end: translate a question locally, then run it through the validated runner.
Returns the runner's result (with the interpreted intent/slots, so a human can see how
the question was read) plus the original question, or a translation error dict."""
t = translate(question, chat_fn=chat_fn)
if t.get("error"):
return {**t, "question": question}
result = run_query(conn, t["intent"], t["slots"],
audit_fn=audit_fn, actor=actor, source=source)
result["question"] = question
return result
+51
View File
@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""Dev harness — fire questions at the LOCAL model and print how each is translated.
Lets you eyeball whether the local Qwen maps real questions to the right curated query
(intent + slots), against your real Spark, with NO UI, auth, HTTP, or deploy. This is the
cheap way to validate translation quality before building the web/Matrix surfaces. It only
translates (it does not touch the DB), so no data is needed and nothing leaves the box.
NOT shipped and NOT a test (no `test_` prefix) — a developer convenience.
Needs SPARK_CONTROL_URL set (read from the repo .env) and the Spark reachable.
Run:
python3 backend/nl_query/try_questions.py # the built-in sample set
python3 backend/nl_query/try_questions.py "when did we last email Acme?"
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
import nl_query # noqa: E402
SAMPLES = [
"Which investors haven't we reached out to in the last 3 months?",
"Which investors do we owe follow-ups to?",
"What is Acme Capital's email and how much have they committed across funds?",
"When did we last reach out to Acme Capital?",
"What were the last 10 investor emails we sent, and who to?",
"What were the last 10 investor emails we received?",
"Who are all the investors located in Austin?",
"List our top 10 investors by committed capital.",
"List our top 10 pipeline investors by stage and the most recent conversation.",
"What is our total pipeline in dollars, split by stage?",
"What were the last investor emails sent by Grant?",
"How many emails has Jonathan sent this week, this month, and year to date?",
]
def main():
questions = sys.argv[1:] or SAMPLES
print(f"Translating {len(questions)} question(s) on the local model "
f"(SPARK_CONTROL_URL={os.environ.get('SPARK_CONTROL_URL', '(unset)')})\n")
for q in questions:
r = nl_query.translate(q)
if r.get("error"):
print(f" ? {q}\n -> [{r['error']}] {r.get('detail', '')}\n")
else:
print(f" ? {q}\n -> {r['intent']} slots={r['slots']}\n")
if __name__ == "__main__":
main()
+62
View File
@@ -2180,6 +2180,10 @@ class CRMHandler(BaseHTTPRequestHandler):
if path == '/api/reminders': if path == '/api/reminders':
return self.handle_list_reminders(user, params) return self.handle_list_reminders(user, params)
# Natural-language query (W2) — the askable catalog
if path == '/api/query/catalog':
return self.handle_nl_query_catalog(user)
# Matrix intake bot — new-vs-existing lookup for its in-thread proposal # Matrix intake bot — new-vs-existing lookup for its in-thread proposal
if path == '/api/intake/match': if path == '/api/intake/match':
return self.handle_intake_match(user, params) return self.handle_intake_match(user, params)
@@ -2268,6 +2272,8 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.handle_pipeline_unlink(user, body) return self.handle_pipeline_unlink(user, body)
if path == '/api/reminders': if path == '/api/reminders':
return self.handle_create_reminder(user, body) return self.handle_create_reminder(user, body)
if path == '/api/query/nl':
return self.handle_nl_query(user, body)
if path == '/api/fundraising/collab/heartbeat': if path == '/api/fundraising/collab/heartbeat':
return self.handle_fundraising_collab_heartbeat(user, body) return self.handle_fundraising_collab_heartbeat(user, body)
if path == '/api/admin/users': if path == '/api/admin/users':
@@ -3613,6 +3619,62 @@ class CRMHandler(BaseHTTPRequestHandler):
finally: finally:
conn.close() conn.close()
# ── Natural-language query (W2) — read-only, the safe nl_query core ──
def handle_nl_query(self, user, body):
"""Answer a plain-English question about the fundraising database, read-only.
Bot-or-admin: a query can surface the whole investor book, so it stays off the member
tier; the audit row written below (entity_type='nl_query') makes a query made through a
leaked or automated credential detectable. Accepts either {question} (mapped to an
intent+slots on the LOCAL model nothing leaves the box) or {intent, slots} (run a
curated query directly, e.g. a UI re-run). BOTH go through the same validator and the
same fixed parameterized SQL in nl_query; result rows never reach any model.
Status: a local-model outage -> 503; an unexpected SQL fault -> 500; everything else,
including a soft 'no question matched', returns 200 with the structured result, because
the UI always wants the interpreted query + summary back rather than a bare HTTP code."""
if not require_bot_or_admin(user):
return self.send_error_json("Bot or admin required", 403)
body = body or {}
question = str(body.get('question') or '').strip()
intent = str(body.get('intent') or '').strip()
if not question and not intent:
return self.send_error_json("question or intent is required")
source = (str(body.get('source') or 'api').strip()[:20]) or 'api'
import nl_query # pure-stdlib at import; the local-model leg is lazy inside translate
conn = get_db()
try:
def _audit(p):
log_audit(conn, user['user_id'], 'nl_query', p.get('intent') or '-', 'query',
{"source": p.get('source'), "slots": p.get('slots'),
"row_count": p.get('row_count'), "error": p.get('error'),
"question": question or None})
if question:
result = nl_query.answer(conn, question, audit_fn=_audit,
actor=user['user_id'], source=source)
else:
result = nl_query.run_query(conn, intent, body.get('slots') or {},
audit_fn=_audit, actor=user['user_id'], source=source)
conn.commit() # persist the audit row(s)
finally:
conn.close()
err = result.get('error')
if err == 'model_unavailable':
return self.send_json({"data": result}, 503)
if err == 'query_failed':
return self.send_json({"data": result}, 500)
return self.send_json({"data": result})
def handle_nl_query_catalog(self, user):
"""The askable surface: every intent's key, summary, slot specs and an example
question so the UI can show what can be asked. Same gate as the query endpoint."""
if not require_bot_or_admin(user):
return self.send_error_json("Bot or admin required", 403)
import nl_query
return self.send_json({"data": nl_query.catalog()})
def handle_intake_match(self, user, params): def handle_intake_match(self, user, params):
"""Read-only: does an investor matching this intake already exist? Used by the """Read-only: does an investor matching this intake already exist? Used by the
Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the
+69
View File
@@ -0,0 +1,69 @@
---
paths:
- backend/nl_query/**
---
# Natural-language query (W2)
Read this before editing the NL-query surface (`backend/nl_query/`). It is the read-only
"ask the database in plain English" layer — web "Ask" box + Matrix `@bot <question>`.
## The trust model — named intents, not a query language
There is **no generic SQL/AST compiler and no dynamically-built identifiers.** Every query is
a fixed, hand-written, reviewed, parameterized statement in `intents.py`; the only thing a
caller (or the model) controls is a small set of typed **slot values**, bound as `?` params.
`runner.validate` is the trust boundary: it accepts only a known intent key and coerces each
slot to its declared type, rejecting anything off-spec. A request that's wrong is rejected;
it can never name a table/column, pick an operator, or write SQL. `run_query` never raises —
every failure returns a structured error dict (a bad `limit=abc` must not crash the thread).
To add a capability: add a `run_*` + a registry entry (with its `slots` spec) in `intents.py`;
the translator prompt and the UI pick it up automatically from `catalog()`. Add a test case.
## Local-only — no Claude, no redaction here
Translation (question → `{intent, slots}`) runs on the **local Qwen via Spark Control**
(`translate.py`, reusing `ingest/llm.py`), the same sanctioned local leg as intake/digest. The
question never leaves the box, so there is **no Claude path and no redaction boundary** — that
was the whole point of the W2 simplification (the *answer* is sensitive and never leaves; the
*question* is generic English, translated locally). Validated **12/12** on real example
questions against the live Spark (2026-06-18). The model output is still untrusted: it goes
straight through `runner.validate`, so a hallucinated intent is rejected. If the local model
ever proves too weak, a Claude-behind-redaction translator could drop in as an alternative
`chat_fn` without touching the validator/executor — deliberately **not** built.
**Results never go to any model.** Summaries are deterministic local strings; rows render
client-side. Never add a "summarize these rows with an LLM" step — that re-introduces the leak.
## Soft-delete per table (the gotcha the design reviews caught)
The `fundraising_*` tables are a **hard-rebuilt projection** of the grid blob and have **no
`deleted_at` column** — do NOT add `deleted_at IS NULL` to them (it raises). Their live/retired
axis is the **`graveyard` flag** (exclude `graveyard = 1` for "live"). Other tables:
- `reminders` / `opportunities` / `communications` → filter `deleted_at IS NULL`.
- `emails` have no `deleted_at`; "live" = a non-tombstoned sighting (`EXISTS email_account_messages … deleted_at IS NULL`), mirroring `query_email_activity` / the digest.
`intents._last_activity_by_investor` **mirrors** `server.last_activity_by_investor` (duplicated
to avoid importing the `__main__` server module — helpers take a `conn`, never import server).
Keep the two in sync; the soft-delete test guards the copy.
## Endpoint, caps, audit
- `POST /api/query/nl` (`require_bot_or_admin`, read-only) — body `{question}` (local translate)
or `{intent, slots}` (direct, e.g. a UI re-run). Returns `{intent, slots, rows, summary,
question}`. `GET /api/query/catalog` returns the askable surface for the UI.
- Status: local-model outage → **503**; unexpected SQL fault → **500**; everything else
(a hit, or a soft `no_match`/`unknown_intent`) → **200** with the structured result, because
the UI always wants the interpreted query back, not a bare code.
- Every executed query writes an audit row (`audit_log`, `entity_type='nl_query'`) so a query
through a leaked/automated credential is detectable. Global row ceiling `MAX_ROWS=500`.
## Tests + dev harness
`test_nl_query.py` (runner: every intent + soft-delete on both recency legs + injection-safety
+ caps), `test_translate.py` (offline translator via an injected `chat_fn`), and
`test_nl_query_endpoint.py` (HTTP auth/wiring/503, local model forced down via a dead
`SPARK_CONTROL_URL` port). `try_questions.py` is a dev harness (not a test) that fires
questions at the real local model and prints the translation — the cheap way to check quality.