From 6c29c2260197133617786fbe20195c74abad4fb7 Mon Sep 17 00:00:00 2001 From: Keysat Date: Thu, 18 Jun 2026 18:35:41 -0500 Subject: [PATCH] Add NL-query backend (W2): local translator + safe named-query runner Read-only "ask the database in plain English" backend. Translation runs on the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves the box, no Claude and no redaction boundary (the simplification chosen after pressure-testing). The safe surface is a curated catalog of ~12 hand-written parameterized queries; a slot validator is the trust boundary (no generic SQL, no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated require_bot_or_admin, read-only, audited. Soft-delete-correct per table. Local Qwen translated 12/12 real example questions correctly against the live Spark. Web "Ask" box and Matrix bot still to come (steps 4-5). --- .claude/rules/nl-query.md | 1 + AGENTS.md | 20 +- ROADMAP.md | 9 +- backend/nl_query/__init__.py | 9 + backend/nl_query/intents.py | 433 +++++++++++++++++++++ backend/nl_query/runner.py | 127 ++++++ backend/nl_query/test_nl_query.py | 226 +++++++++++ backend/nl_query/test_nl_query_endpoint.py | 139 +++++++ backend/nl_query/test_translate.py | 107 +++++ backend/nl_query/translate.py | 108 +++++ backend/nl_query/try_questions.py | 51 +++ backend/server.py | 62 +++ docs/guides/nl-query.md | 69 ++++ 13 files changed, 1348 insertions(+), 13 deletions(-) create mode 120000 .claude/rules/nl-query.md create mode 100644 backend/nl_query/__init__.py create mode 100644 backend/nl_query/intents.py create mode 100644 backend/nl_query/runner.py create mode 100644 backend/nl_query/test_nl_query.py create mode 100644 backend/nl_query/test_nl_query_endpoint.py create mode 100644 backend/nl_query/test_translate.py create mode 100644 backend/nl_query/translate.py create mode 100644 backend/nl_query/try_questions.py create mode 100644 docs/guides/nl-query.md diff --git a/.claude/rules/nl-query.md b/.claude/rules/nl-query.md new file mode 120000 index 0000000..09b9e34 --- /dev/null +++ b/.claude/rules/nl-query.md @@ -0,0 +1 @@ +../../docs/guides/nl-query.md \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md index 71eb481..a5181c5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -48,6 +48,7 @@ cd start9/0.4 && make - `backend/redaction/` — `scrub.py` + `client.py`: the scrub→Claude→re-hydrate privacy boundary. - `backend/ingest/` — chunk→embed→Qdrant + retrieval modes. - `backend/entity_*.py` — entity resolution/merge (the two-investor-model reconciliation). +- `backend/nl_query/` — read-only natural-language query (W2): `intents.py` (curated parameterized query catalog), `runner.py` (slot validator = trust boundary), `translate.py` (local-Qwen question→{intent,slots}). See the nl-query guide. - `backend/matrix_intake/` — Matrix intake bot (separate process; `matrix-nio`, isolated to this component): typed message → local-Qwen parse → in-thread approve → write via the CRM's own `log-communication`. See the matrix-intake guide. - `frontend/index.html` — the entire UI. - `docs/` — architecture, phase plans, contracts, runbooks (see Deeper docs). `docs/guides/` — scoped subsystem rules (see below). @@ -65,6 +66,7 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude - **Email capture / drafts + digest send** (`backend/email_integration/`, `backend/digest_mailer.py`, `backend/smtp_send.py`) → `docs/guides/email.md` - **Building or deploying the s9pk** (`start9/`) → `docs/guides/packaging.md` - **Matrix intake bot** (`backend/matrix_intake/`) → `docs/guides/matrix-intake.md` +- **Natural-language query** (`backend/nl_query/`) → `docs/guides/nl-query.md` ## Conventions @@ -104,17 +106,13 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude ## Current state -_Phase 0 + Phase 1 built; **box live at v0.1.0:91; repo at v0.1.0:92** (v92 = reminders/follow-ups — built + tested locally 2026-06-18, **deploy pending**. Box deployed & verified live 2026-06-18 — `installed-version`=0.1.0:91, server up on :8080, clean; the StartOS version-graph traversal logs an inert down-to-39-then-up because the per-version `up`/`down` hooks are no-ops — real SQLite migrations run in-app at startup). **The fundraising grid + email capture is the canonical system of record.** Deploy/feature history: git log + `start9/0.4/startos/versions/`; longer-term backlog/debt: `ROADMAP.md` / `EVALUATION.md`._ +_Phase 0 + Phase 1 built; **box live at v0.1.0:91; repo at v0.1.0:92** (reminders, deploy pending). **The fundraising grid + email capture is the canonical system of record.** Active thread: **W2 natural-language query** (backend done + validated locally; web/Matrix UI next). Deploy/feature history: git log + `start9/0.4/startos/versions/`; longer-term backlog/debt: `ROADMAP.md` / `EVALUATION.md`._ -- **Reminders & follow-ups (W1) — BUILT + tested locally 2026-06-18 (repo v0.1.0:92, deploy pending).** First step of the agreed reminders → NL-search → bot-mutations plan (`ROADMAP.md` "Follow-ups/reminders + NL search + bot grid-mutations"; **overarching constraint: keep LP data off third-party LLMs — the dominant risk, above write-safety**). First-class tickler tied to the grid: `reminders` table (in-app migration `0006`; logical FK to `fundraising_investors.id` + denormalized name, like `0005`), full CRUD (`GET/POST/PATCH/DELETE /api/reminders`; soft-delete; open/done/snoozed/cancelled; assignee; `source` human/bot/automation; accepts `source_row_id` so the grid stays decoupled), a read-only **derived `reminder_status` grid column** (overdue/due_soon/open — injected + stripped like `pipeline_stage`; **filterable so a saved view can later drive the follow-up view off real reminders, not the binary `follow_up` checkbox**), an orphan reconciler (`reconcile_grid_reminders`), a **Reminders** page + Dashboard **"Reminders Due"** card + **"Reminders due"** daily-digest section, and a per-investor **`last_activity_at`** recency rollup (shared building block for the W2 NL "not nurtured" query). **Pure local CRM — no LLM path, no leak surface.** Snooze keeps a reminder `open` with a pushed-out date (reliably reappears); the `snoozed` status is an explicit "mute" (Edit only). Tests: `test_reminders.py` + digest reminders test (**31/31 green, render-smoke green**). **Not yet deployed** — needs an s9pk build + install (authorize first; verify `0006` against a DB copy). Deferred fast-follow **W1b** = nurture-gap auto-suggested reminders. +- **W2 — natural-language query (read-only): BACKEND built + tested + validated locally 2026-06-18; web/Matrix UI next.** `backend/nl_query/` — 12 curated parameterized queries + a slot validator (the trust boundary; no generic SQL) + a **local-Qwen** translator (question→{intent,slots} via Spark Control; nothing leaves the box, **no Claude, no redaction** — the simplification Grant chose). `POST /api/query/nl` (also accepts direct `{intent,slots}`) + `GET /api/query/catalog`, `require_bot_or_admin`, audited (`entity_type='nl_query'`). **Local Qwen translated 12/12 of Grant's real example questions correctly against the live Spark** — settles local-only (Claude not needed). Soft-delete-correct per table (gotcha: `fundraising_*` has **no `deleted_at`** — `graveyard` is the axis; emails via a live `eam` sighting). Guide: `docs/guides/nl-query.md`. **Next: step 4 web "Ask" box (Communications tab), step 5 Matrix `@bot `** — both thin clients of the endpoint. Not committed at deploy/version level yet; folds into the next s9pk. -- **Email-proposal review over Matrix + a `bot` role — DEPLOYED, LIVE & smoke-tested 2026-06-18 (box v0.1.0:91, Spark bot `b2690c4`).** The CRM-drafted "proposed grid notes" gain: (1) a click-to-view **inline source-email popup** on the Email Capture page (`GET /api/email/detail` — from/to/cc/date/subject + scrollable body); (2) a **CRM→Matrix review bridge** — the bot pulls pending proposals (`GET /api/intake/email-proposals`), posts dash-framed review cards (note names who emailed whom, not "Sent/Received") to a dedicated room (`MATRIX_EMAIL_REVIEW_ROOM`), and relays in-thread yes/no/NL-edit (`POST .../decide`), kept in sync with the web panel (decide on either → the other reflects it). **Decided threads are redacted whole** (card + replies; the bot holds a redact/mod power level) so — with Element's "show deleted messages" OFF — the main chat *and* threads view clear completely (confirmed the intended UX). New **`bot` role** (authenticated, never admin; `require_bot_or_admin`) gates the agent endpoints; state in `email_proposal_matrix` (email-migration `0003`). Full mechanics, deploy gotchas, and the `redact_resolved.py` backfill tool: `docs/guides/matrix-intake.md`. +- **W1 — reminders & follow-ups: BUILT + tested locally (v0.1.0:92), DEPLOY PENDING.** First-class tickler tied to the grid (migration `0006`; CRUD `GET/POST/PATCH/DELETE /api/reminders`; derived `reminder_status` grid column; Reminders page + dashboard card + digest section; the `last_activity_at` recency rollup that W2 reuses). Needs s9pk build + install (authorize first; verify `0006` against a DB copy). Deferred **W1b** = nurture-gap auto-suggested reminders. -- **Adopt the Pipeline — LIVE & smoked (v0.1.0:88):** the grid drives the deal board. "Add to Pipeline" row action → durably-linked `opportunities` row via `opportunities.fundraising_investor_id` (migration 0005); grid owns link+seed, board owns stage/probability/owner; "Remove" soft-deletes the opp, grid row intact; deleting a grid investor archives its orphaned opp. Detail + locked decisions: `ROADMAP.md` "Adopt the Pipeline". - -- **Matrix intake bot — LIVE (Spark, container `matrix-intake`, bot `b2690c4`):** typed Matrix message → local-Qwen parse → in-thread human approval → write via `POST /api/fundraising/log-communication` (`source="matrix_intake"`); new-vs-existing via `GET /api/intake/match`. Fuzzy match + numbered disambiguation shortlist, conversational NL revise (email-integrity preserved), and the `INTAKE_TEAM_ROSTER` outreach frame all shipped & mostly smoked. **One path still un-smoked in-room: the fuzzy disambiguation numbered-pick grammar.** Runs as a docker-compose service (`restart: unless-stopped`). Guide: `docs/guides/matrix-intake.md`. -- **Working (all draft-only):** CRM + ingest (chunk→embed→Qdrant + retrieval) + redaction boundary; Gmail capture (DWD) + email-activity propose→approve; Thesis Workshop + Architect (Claude) with dual-approval gate; Outreach Draft Assistant + follow-up radar + per-user voice + Tier-B in-thread Gmail draft creation. -- **Tests:** **30/30 backend green** (`python3 backend/run_tests.py`), `py_compile` clean; frontend render-smoke gates the default `make` build. -- **Debt (P2, not deploy-blocking; full list `EVALUATION.md`):** reports-subsystem soft-delete sweep — **pipeline/opportunities aggregates fixed v87**; remaining: the dashboard **communications** aggregates (`recent_comms`/`comms_this_month`/`meetings_this_month`) + activity report + report-endpoint tests; `?limit=abc` crashes the request thread; auth regression test for the 3 v79-gated GETs (`/api/users`, `/api/email/status`, `/api/email/accounts`); scrub-gateway TLS verify off; hardcoded Spark/Qdrant IPs + **oversized StartOS package icon** (fix before the next s9pk upload); the 5.4k-line `server.py` monolith. -- **Open / risks:** the v2.0 reserve-asset spine is the *working* approved spine but **not a canonical `thesis_version`** (needs Grant + Jonathan dual sign-off; Appendix-A conviction incl. ~40% Strike stays Grant's working read, not fed to the engine); **Claude/Architect path still unverified live on the box**; the intake matcher reads only the grid blob (not classic `contacts`); doc drift — `crm-overview.md` + `EVALUATION.md` still call `lp_profiles` live (doc-auditor pass). -- **Next:** 1) **deploy v0.1.0:92 (reminders)** to the box — needs authorization; verify migration `0006` against a copy of `data/crm.db`, then `make` + install + browser-verify the Reminders page/grid chip/dashboard card (only render-smoke ran locally, not a live authenticated click-through); 2) **W2 — NL→safe-query** (the agreed plan's next build; validated filter-AST, Claude behind redaction, only the question text leaves the box; web + Matrix; = old "search item 3"), then **W3 — bot grid-mutations** behind the Matrix approval gate; 3) **W1b** nurture-gap auto-suggested reminders (fast-follow once recency proven); 4) spark-control intake dashboard card + extract intake bot to its own repo (ROADMAP); 5) in-room smoke of the intake **disambiguation** numbered-pick grammar; 6) Grant + Jonathan freeze v2.0 canonical; 7) reply-all for Tier-B drafts; then clear the P2 debt (reports comms-aggregate soft-delete sweep, `?limit=abc` crash, auth regression test, oversized StartOS icon). **Possible follow-ups:** email-review `since`-floor on `to_post`; Pipeline drag-and-drop stage moves. +- **Done & live (detail in git log / ROADMAP):** email-proposal Matrix review + `bot` role (box v91); grid-driven Pipeline (v88); Matrix intake bot (Spark `matrix-intake` container); Gmail capture (DWD) + propose→approve + daily digest; Thesis Workshop + Architect (Claude, dual-approval); outreach drafts + radar. All draft-only. +- **Tests:** **34/34 backend green** (`python3 backend/run_tests.py`; +`nl_query/` suite), `py_compile` clean; render-smoke gates `make`. +- **Next (priority order):** 1) **W2 step 4** web Ask box, then **step 5** Matrix `@bot`; 2) **deploy reminders (v92) + W2 together** — bump to **v0.1.0:93**, build s9pk, install, browser-verify (authorize first; verify `0006` against a DB copy); 3) **W3** bot grid-mutations behind the Matrix approval gate (local-Qwen parse); 4) **W1b** nurture-gap reminders; 5) Grant + Jonathan freeze v2.0 canonical; 6) in-room smoke of the intake disambiguation numbered-pick grammar; then P2 debt (reports comms-aggregate soft-delete sweep, `?limit=abc` crash, auth regression test, oversized StartOS icon). +- **Open / risks:** W2 translation only **happy-path-validated** (typos/ambiguous/no-match phrasings shake out in live use); **Claude/Architect path still unverified live on the box**; v2.0 reserve-asset spine is the *working approved* spine but **not canonical** (needs dual sign-off); doc drift — `crm-overview.md` + `EVALUATION.md` still call `lp_profiles` live. diff --git a/ROADMAP.md b/ROADMAP.md index e26cfcc..8357915 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -93,7 +93,12 @@ - **W1b (deferred fast-follow):** nurture-gap automation — a daily job flags "committed / in-pipeline + no activity in N days + no open reminder" → auto-suggests a reminder (`source='automation'`, human confirms). Build once the recency rollup is proven in practice. - **Left untouched (deliberate):** the grid `follow_up` checkbox + automation list-memberships, and `communications.next_action_date` + `/api/outreach/radar` — reminders are the new richer layer; folding those into it is a later cleanup, not now. -**W2 — Natural-language query (read-only).** = the **"Email/communication search + NL query → item 3 (NL→safe structured query)"** below, now sequenced second. Locked stance: the LLM emits a **validated filter/query AST** over a curated field set (committed $, fund, stage, lead, `follow_up`, `last_activity_at`, `reminder_status`, …); the backend owns the SQL against soft-delete-filtered views with row/time caps — **never raw SQL**. Claude behind the redaction boundary; only the question text + schema vocabulary leave the box, never investor rows. Deliver in **both** web (search box) and Matrix (`@bot who needs follow-up?`). Reads need no approval gate. Builds on W1's `last_activity_at`. +**W2 — Natural-language query (read-only). BACKEND BUILT + tested + validated locally 2026-06-18; web/Matrix UI pending.** = the **"Email/communication search + NL query → item 3 (NL→safe structured query)"** below, now sequenced second and **redesigned** (see below). Subsystem detail: `docs/guides/nl-query.md`. + +- **Approach changed from the original "Claude behind redaction + a validated filter-AST" to LOCAL-ONLY + a named-intent catalog (decided with Grant 2026-06-18).** Rationale: (a) the dominant risk is LP data reaching a vendor — running translation on the **local Qwen via Spark Control** keeps the question on the box entirely (same basis as intake/digest), so there is **no Claude path and no redaction boundary** to manage, which is both simpler and safer; (b) a generic SQL/AST compiler was over-built for the real need — instead there are **~12 curated, hand-written, parameterized "named queries"** (`backend/nl_query/intents.py`) each with typed slots, and the **slot validator** (`runner.validate`) is the whole trust boundary (no dynamic identifiers, no raw SQL). The LLM only maps a question → `{intent, slots}`; its output is still validated, so a hallucinated intent is rejected. **Results never go to any model** (deterministic local render). Both design choices were pressure-tested by independent review agents before building. +- **As built:** `backend/nl_query/` (`intents.py` catalog, `runner.py` validator/executor + audit, `translate.py` local-Qwen translator, `try_questions.py` dev harness). `POST /api/query/nl` (`{question}` or direct `{intent,slots}`) + `GET /api/query/catalog`, `require_bot_or_admin`, read-only, audited (`audit_log` `entity_type='nl_query'`). Soft-delete-correct per table (`fundraising_*` has no `deleted_at` — `graveyard` is the axis; emails via a live `email_account_messages` sighting; reminders/opps/comms via `deleted_at`). Builds on W1's `last_activity_at`. Tests: `nl_query/test_nl_query.py` + `test_translate.py` + `test_nl_query_endpoint.py` (34/34 suite green). +- **Validation:** the local Qwen translated **12/12 of Grant's real example questions** correctly (right intent + slots, incl. "3 months"→90, sent/received→direction) against the live Spark — settles local-only; Claude not needed. Translation quality on messy/typo/no-match inputs shakes out in live use. +- **Remaining:** **step 4** = web "Ask" box in the Communications tab (calls the endpoint, renders rows + the interpreted query); **step 5** = Matrix `@bot ` (thin client of the endpoint; the 2-admin review room means a full-book dump is acceptable, so no bulk-result cap — only a light anti-flood truncation). Reads need no approval gate. Then deploy with reminders (v92) as **v0.1.0:93**. **W3 — Bot grid-mutations behind a Matrix approval gate.** Generalize the email-proposal scaffold (`email_proposal_matrix` + propose→post→decide→apply) into one `agent_proposals` table (kind discriminator + JSON payload + target). Bot proposes set-commitment / assign-fund / change-stage / set-reminder; a human approves/edits/rejects in Matrix (**any member**); then apply. **Surgical, version-checked mutations — never blob RMW:** stage rides the existing `opportunities` link + validated stage endpoint; reminders write the W1 table; set-commitment/assign-fund need a version-checked single-cell upsert into the grid blob. Triggers the deferred **scoped service-token** item below (per-mutation-kind allowlist on the bot credential; money/merge/delete always require human approval regardless of scope — the autonomy axis). Parse on local Qwen, not Claude. @@ -171,7 +176,7 @@ Open design questions (settled at build time): send time = **6 PM box-local** (c **2. Email content search box — DONE (v0.1.0:83).** A **"Search content"** toggle in the Communications tab → `GET /api/email/search?q=` wraps `backend/ingest/search.py:hybrid_search` filtered to `doc_type='email'`; hits are hydrated + soft-delete-filtered against SQLite (canonical) and link back to the full body. Semantic/lexical search over email *content* ("find where we discussed the mining deal"), distinct from item 1's structured filters. 503 (clean "unavailable") when Spark/Qdrant is unreachable. -**3. Natural-language → safe structured query (separate, larger, after 1 & 2).** An LLM translates a plain-English question into a **safe, read-only** DB query against the CRM, for relational/analytical questions that semantic search *cannot* answer — Grant's example ("committed across funds AND not emailed in a while") is joins + aggregates + recency, not a text-topic match. Design constraints (locked at request time, refine at build): +**3. Natural-language → safe structured query — SUPERSEDED & BUILT as W2 above (2026-06-18).** The design constraints below (especially "LLM = Claude behind the redaction boundary" and the validated-AST shape) were **revisited and changed** during the build: translation runs on the **local Qwen** (no Claude, no redaction), and the safe surface is a **named-intent catalog**, not a generic query AST. See the W2 entry above and `docs/guides/nl-query.md` for what shipped; the original framing is kept here for provenance. _An LLM translates a plain-English question into a **safe, read-only** DB query against the CRM, for relational/analytical questions that semantic search *cannot* answer — Grant's example ("committed across funds AND not emailed in a while") is joins + aggregates + recency, not a text-topic match. Original design constraints (locked at request time):_ - **LLM = Claude behind the redaction boundary** (better at text-to-SQL than local Qwen; the scrub→Claude→re-hydrate path already exists for the PII concern). Not Spark — Spark Control offers embeddings/rerank/RAG + local chat, but **no text-to-SQL**. - **Safety is the hard part, not the parsing.** Do NOT hand the LLM open-ended SQL against the live DB (soft-delete leaks, injection, runaway scans). Constrain it: read-only connection/view, a curated/parameterized query surface or a validated query AST, soft-delete-filtered views, row/time caps. Treat as its own designed feature with its own tests. - Must reckon with the two-model join caveat above (capital lives in the grid; recency from email links). diff --git a/backend/nl_query/__init__.py b/backend/nl_query/__init__.py new file mode 100644 index 0000000..26eac8f --- /dev/null +++ b/backend/nl_query/__init__.py @@ -0,0 +1,9 @@ +"""nl_query — the safe, read-only natural-language query surface (W2). + +The LLM's job (added later) is only to map a question to a {intent, slots} pair; everything +that touches the database lives here behind a strict validator and a fixed, hand-written, +parameterized query catalog. See runner.py (the trust boundary) and intents.py (the catalog). +""" +from .runner import run_query, validate, catalog # noqa: F401 +from .intents import INTENTS # noqa: F401 +from .translate import translate, answer, build_system # noqa: F401 diff --git a/backend/nl_query/intents.py b/backend/nl_query/intents.py new file mode 100644 index 0000000..e0ca60e --- /dev/null +++ b/backend/nl_query/intents.py @@ -0,0 +1,433 @@ +"""NL-query intents — the curated, hand-written query catalog (W2, the safe core). + +Each intent is a FIXED, reviewed, parameterized SQL query with a small set of typed +"slots" (the blanks a question fills in: a number of days, a name, a limit). There is NO +generic SQL/AST compiler and NO dynamically-built identifiers: every table and column name +is hardcoded in the query text, and every value the caller (or an LLM) supplies reaches +SQLite only as a bound `?` parameter. That is the whole trust model — a malformed or +hostile request can change a bound value, never the query structure. Adding a capability +means adding a reviewed entry here, not widening a language. + +Soft-delete discipline (CLAUDE.md standing rule), per table: + - reminders / opportunities / communications carry `deleted_at` -> filter `deleted_at IS NULL`. + - emails have NO `deleted_at`; "live" means a non-tombstoned per-mailbox sighting exists + (`email_account_messages.deleted_at IS NULL`) — mirror the digest / query_email_activity. + - fundraising_investors/_contacts/_funds/_commitments are a HARD-REBUILT projection of the + grid blob with NO `deleted_at` column; the live/retired axis there is the `graveyard` flag. + Do NOT add `deleted_at IS NULL` to those tables — the column does not exist and the clause + would raise. Exclude `graveyard = 1` where the question means "live" investors. + +Each run_* returns {columns, rows, summary, truncated}. `summary` is a DETERMINISTIC local +one-liner (never an LLM narrative) — results never leave the box to be summarized. +""" +import sqlite3 +from datetime import datetime, timedelta + +# Generous ceiling — the Matrix review room is two admins and the web app is internal, so +# dumping the full book is acceptable (per Grant); this only guards against an unbounded +# scan flooding a response. A list intent past this is reported truncated, never silently cut. +MAX_ROWS = 500 + +# Live, non-terminal pipeline stages in funnel order (mirrors server.PIPELINE_STAGES; 'lost' +# is the terminal drop). Kept here so the pipeline intents have a stable rank without importing +# the server module (helpers take a conn; they never import server — house convention). +_STAGE_ORDER = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded'] +_STAGE_RANK_SQL = ( + "CASE stage WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3 " + "WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6 ELSE 0 END") + + +# ── helpers ──────────────────────────────────────────────────────────────────────────── +def _rows(cur): + """Materialize a cursor as a list of plain dicts, independent of the connection's + row_factory (works whether rows come back as tuples or sqlite3.Row).""" + cols = [c[0] for c in cur.description] + return [dict(zip(cols, r)) for r in cur.fetchall()] + + +def like_contains(value): + """Build a safe LIKE pattern for a free-text contains match. Escapes the LIKE + wildcards so a user/LLM value of '%' or '_' is treated literally — paired with + `LIKE ? ESCAPE '\\'` in the SQL, this stops '%' from matching the entire table.""" + v = value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + return f"%{v}%" + + +def _last_activity_by_investor(conn): + """{fundraising_investors.id: latest activity ISO ts} across logged communications and + captured grid-linked emails — the per-investor recency signal behind the "gone quiet" + and "last contact" intents. + + NB: this MIRRORS server.last_activity_by_investor() and its soft-delete joins (comms via + cm.deleted_at IS NULL; email via a live email_account_messages sighting). It is duplicated + rather than imported only to keep this module free of a server import (the main module runs + as __main__, so `import server` would re-execute it). Keep the two in sync; the soft-delete + test guards this copy. If a third caller appears, extract both to a shared module.""" + out = {} + + def _bump(inv_id, ts): + if inv_id and ts and (out.get(inv_id) is None or str(ts) > str(out[inv_id])): + out[inv_id] = ts + + # Each leg is guarded: the comms/email tables can be absent on a minimal DB. This is a + # narrow, intentional tolerance for optional tables — NOT the broad error-swallowing the + # runner forbids (a failure in an intent's main query surfaces as query_failed). + try: + for r in conn.execute( + "SELECT fc.investor_id AS inv, MAX(cm.communication_date) AS last_ts " + "FROM communications cm JOIN fundraising_contacts fc ON fc.contact_id = cm.contact_id " + "WHERE cm.deleted_at IS NULL AND fc.contact_id IS NOT NULL GROUP BY fc.investor_id"): + _bump(r["inv"], r["last_ts"]) + except sqlite3.OperationalError: + pass + try: + for r in conn.execute( + "SELECT eil.fundraising_investor_id AS inv, MAX(e.sent_at) AS last_ts " + "FROM email_investor_links eil JOIN emails e ON e.id = eil.email_id " + "WHERE eil.fundraising_investor_id IS NOT NULL AND EXISTS " + "(SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id " + "AND eam.deleted_at IS NULL) GROUP BY eil.fundraising_investor_id"): + _bump(r["inv"], r["last_ts"]) + except sqlite3.OperationalError: + pass + return out + + +def _today(): + return datetime.utcnow().date() + + +def _days_since(ts): + """Whole days between an ISO date/datetime string and today (UTC). None if unparseable.""" + if not ts: + return None + try: + d = datetime.fromisoformat(str(ts)[:10].replace("Z", "")).date() + except ValueError: + return None + return (_today() - d).days + + +def _own_addresses(conn): + try: + return {(r[0] or "").lower().strip() + for r in conn.execute("SELECT email_address FROM email_accounts")} - {""} + except sqlite3.OperationalError: + return set() + + +def _truncate(rows): + """Apply the global ceiling, returning (rows, truncated).""" + if len(rows) > MAX_ROWS: + return rows[:MAX_ROWS], True + return rows, False + + +# ── investor intents ───────────────────────────────────────────────────────────────────── +def run_investors_cold(conn, slots): + """Live investors not contacted in `days` days — never-contacted first, then oldest.""" + days = slots["days"] + cutoff = (_today() - timedelta(days=days)).isoformat() + last = _last_activity_by_investor(conn) + invs = _rows(conn.execute( + "SELECT id, investor_name, lead, total_invested FROM fundraising_investors " + "WHERE graveyard = 0 ORDER BY investor_name")) + cold = [] + for inv in invs: + ts = last.get(inv["id"]) + if ts is None or str(ts)[:10] < cutoff: + cold.append({"investor_name": inv["investor_name"], "lead": inv["lead"], + "total_invested": inv["total_invested"], + "last_activity_at": ts, "days_since": _days_since(ts)}) + # never-contacted (days_since None) first, then most-stale first + cold.sort(key=lambda r: (r["days_since"] is not None, -(r["days_since"] or 0))) + rows, trunc = _truncate(cold) + return {"columns": ["investor_name", "lead", "total_invested", "last_activity_at", "days_since"], + "rows": rows, "truncated": trunc, + "summary": f"{len(cold)} live investor(s) not contacted in {days}+ days."} + + +def run_investor_lookup(conn, slots): + """One investor's profile: contacts (name/email/title/city), committed total, per-fund + commitments, lead. Name matched as a contains (an LLM/user may pass a partial).""" + pat = like_contains(slots["name"]) + invs = _rows(conn.execute( + "SELECT id, investor_name, lead, lead_source, total_invested, follow_up, graveyard " + "FROM fundraising_investors WHERE investor_name LIKE ? ESCAPE '\\' " + "ORDER BY graveyard, investor_name LIMIT 25", (pat,))) + for inv in invs: + inv["contacts"] = _rows(conn.execute( + "SELECT full_name, email, title, city, state, country FROM fundraising_contacts " + "WHERE investor_id = ? ORDER BY sort_order, full_name", (inv["id"],))) + inv["commitments"] = _rows(conn.execute( + "SELECT f.fund_name, c.amount FROM fundraising_commitments c " + "JOIN fundraising_funds f ON f.id = c.fund_id WHERE c.investor_id = ? AND c.amount <> 0 " + "ORDER BY f.display_order", (inv["id"],))) + inv.pop("id", None) + return {"columns": ["investor_name", "lead", "lead_source", "total_invested", + "follow_up", "graveyard", "contacts", "commitments"], + "rows": invs, "truncated": False, + "summary": f"{len(invs)} investor(s) matching \"{slots['name']}\"."} + + +def run_investors_by_city(conn, slots): + """Investors with a contact located in `city` (contains match on the contact's city).""" + pat = like_contains(slots["city"]) + rows = _rows(conn.execute( + "SELECT i.investor_name, c.full_name AS contact, c.city, c.state, c.country, i.lead " + "FROM fundraising_contacts c JOIN fundraising_investors i ON i.id = c.investor_id " + "WHERE i.graveyard = 0 AND c.city LIKE ? ESCAPE '\\' " + "ORDER BY i.investor_name, c.full_name LIMIT ?", (pat, MAX_ROWS + 1))) + rows, trunc = _truncate(rows) + return {"columns": ["investor_name", "contact", "city", "state", "country", "lead"], + "rows": rows, "truncated": trunc, + "summary": f"{len(rows)} investor contact(s) in \"{slots['city']}\"."} + + +def run_investors_by_lead(conn, slots): + """Live investors owned by a given lead/team member (contains match on `lead`).""" + pat = like_contains(slots["lead"]) + rows = _rows(conn.execute( + "SELECT investor_name, lead, total_invested, follow_up FROM fundraising_investors " + "WHERE graveyard = 0 AND lead LIKE ? ESCAPE '\\' " + "ORDER BY total_invested DESC, investor_name LIMIT ?", (pat, MAX_ROWS + 1))) + rows, trunc = _truncate(rows) + return {"columns": ["investor_name", "lead", "total_invested", "follow_up"], + "rows": rows, "truncated": trunc, + "summary": f"{len(rows)} live investor(s) led by \"{slots['lead']}\"."} + + +def run_top_investors_committed(conn, slots): + """Top `limit` live investors by total committed capital across all funds.""" + n = slots["limit"] + rows = _rows(conn.execute( + "SELECT investor_name, total_invested, lead FROM fundraising_investors " + "WHERE graveyard = 0 AND total_invested > 0 " + "ORDER BY total_invested DESC, investor_name LIMIT ?", (n,))) + return {"columns": ["investor_name", "total_invested", "lead"], "rows": rows, + "truncated": False, "summary": f"Top {len(rows)} investor(s) by committed capital."} + + +def run_investors_follow_up(conn, slots): + """Investors we owe a follow-up to: those with an OPEN reminder, overdue first. Uses the + W1 reminders table (the richer follow-up layer) joined to the current grid name.""" + today = _today().isoformat() + rows = _rows(conn.execute( + "SELECT COALESCE(i.investor_name, r.investor_name) AS investor_name, r.title, " + "r.due_date, r.status, r.assignee_id, " + "CASE WHEN r.due_date IS NOT NULL AND substr(r.due_date,1,10) < ? THEN 1 ELSE 0 END AS overdue " + "FROM reminders r LEFT JOIN fundraising_investors i ON i.id = r.investor_id " + "WHERE r.deleted_at IS NULL AND r.status = 'open' AND r.investor_id IS NOT NULL " + "ORDER BY (r.due_date IS NULL), r.due_date ASC LIMIT ?", (today, MAX_ROWS + 1))) + rows, trunc = _truncate(rows) + return {"columns": ["investor_name", "title", "due_date", "status", "overdue"], + "rows": rows, "truncated": trunc, + "summary": f"{len(rows)} investor(s) with an open follow-up reminder."} + + +# ── pipeline intents ────────────────────────────────────────────────────────────────────── +def run_pipeline_top(conn, slots): + """Top `limit` live pipeline opportunities by stage (furthest along first), with the + investor, owner, and most-recent activity.""" + n = slots["limit"] + last = _last_activity_by_investor(conn) + rows = _rows(conn.execute( + "SELECT o.fundraising_investor_id AS inv_id, " + "COALESCE(i.investor_name, o.name) AS investor_name, o.stage, o.expected_amount, " + "o.probability, u.full_name AS owner FROM opportunities o " + "LEFT JOIN fundraising_investors i ON i.id = o.fundraising_investor_id " + "LEFT JOIN users u ON u.id = o.owner_id " + "WHERE o.deleted_at IS NULL AND o.stage != 'lost' " + f"ORDER BY {_STAGE_RANK_SQL} DESC, o.expected_amount DESC LIMIT ?", (n,))) + for r in rows: + r["last_activity_at"] = last.get(r.pop("inv_id")) + return {"columns": ["investor_name", "stage", "expected_amount", "probability", "owner", + "last_activity_at"], + "rows": rows, "truncated": False, + "summary": f"Top {len(rows)} live pipeline opportunit(ies) by stage."} + + +def run_pipeline_totals(conn, slots): + """Total pipeline dollars and the split across each stage (excludes lost).""" + rows = _rows(conn.execute( + "SELECT stage, COUNT(*) AS count, COALESCE(SUM(expected_amount),0) AS expected_total, " + "COALESCE(SUM(commitment_amount),0) AS committed_total FROM opportunities " + f"WHERE deleted_at IS NULL AND stage != 'lost' GROUP BY stage ORDER BY {_STAGE_RANK_SQL}")) + total = sum(r["expected_total"] for r in rows) + count = sum(r["count"] for r in rows) + return {"columns": ["stage", "count", "expected_total", "committed_total"], + "rows": rows, "truncated": False, + "summary": f"${total:,.0f} expected across {count} live opportunit(ies) in " + f"{len(rows)} stage(s)."} + + +# ── email / communication intents ───────────────────────────────────────────────────────── +def run_recent_emails(conn, slots): + """The most recent `limit` matched investor emails, optionally one direction. + Matched-only + soft-delete-correct (a live email_account_messages sighting), mirroring + the Communications panel's query_email_activity.""" + n, direction = slots["limit"], slots["direction"] + where = ["EXISTS (SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id " + "AND eam.deleted_at IS NULL)", + "EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"] + params = [] + own = _own_addresses(conn) + if direction in ("inbound", "outbound") and own: + op = "IN" if direction == "outbound" else "NOT IN" + where.append(f"LOWER(e.from_email) {op} ({','.join('?' for _ in own)})") + params.extend(sorted(own)) + sql = ("SELECT e.subject, e.from_name, e.from_email, e.sent_at, " + "(SELECT fi.investor_name FROM email_investor_links l " + " JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id " + " WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor " + "FROM emails e WHERE " + " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?") + rows = _rows(conn.execute(sql, params + [n])) + label = {"inbound": "received", "outbound": "sent"}.get(direction, "") + return {"columns": ["sent_at", "subject", "from_name", "from_email", "investor"], + "rows": rows, "truncated": False, + "summary": f"{len(rows)} most-recent {label + ' ' if label else ''}investor email(s)."} + + +def run_investor_last_contact(conn, slots): + """When we last had any activity with investor X (matched by name).""" + pat = like_contains(slots["name"]) + last = _last_activity_by_investor(conn) + invs = _rows(conn.execute( + "SELECT id, investor_name FROM fundraising_investors " + "WHERE investor_name LIKE ? ESCAPE '\\' ORDER BY graveyard, investor_name LIMIT 25", (pat,))) + rows = [] + for inv in invs: + ts = last.get(inv["id"]) + rows.append({"investor_name": inv["investor_name"], "last_activity_at": ts, + "days_since": _days_since(ts)}) + return {"columns": ["investor_name", "last_activity_at", "days_since"], "rows": rows, + "truncated": False, "summary": f"Last contact for {len(rows)} investor(s) " + f"matching \"{slots['name']}\"."} + + +def run_comms_by_user(conn, slots): + """The most recent `limit` outbound investor emails sent by a given user (matched by + username or full name). Soft-delete-correct (live sighting, is_sent).""" + n, pat = slots["limit"], like_contains(slots["user"]) + rows = _rows(conn.execute( + "SELECT e.subject, e.sent_at, u.full_name AS sender, " + "(SELECT fi.investor_name FROM email_investor_links l " + " JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id " + " WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor " + "FROM emails e JOIN email_account_messages eam ON eam.email_id = e.id " + "AND eam.deleted_at IS NULL AND eam.is_sent = 1 " + "JOIN email_accounts ea ON ea.id = eam.account_id JOIN users u ON u.id = ea.user_id " + "WHERE (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\') " + "ORDER BY e.sent_at DESC LIMIT ?", (pat, pat, n))) + return {"columns": ["sent_at", "subject", "sender", "investor"], "rows": rows, + "truncated": False, + "summary": f"{len(rows)} recent email(s) sent by \"{slots['user']}\"."} + + +def run_email_counts_by_user(conn, slots): + """Per-user counts of outbound investor emails over this week / month / year-to-date. + Windows are calendar-based: week = since Monday, month = since the 1st, ytd = since Jan 1.""" + today = _today() + wk = (today - timedelta(days=today.weekday())).isoformat() + mo = today.replace(day=1).isoformat() + yr = today.replace(month=1, day=1).isoformat() + where = "WHERE eam.deleted_at IS NULL AND eam.is_sent = 1" + params = [wk, mo, yr] + if slots.get("user"): + pat = like_contains(slots["user"]) + where += " AND (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\')" + params.extend([pat, pat]) + rows = _rows(conn.execute( + "SELECT u.full_name AS user, u.username, " + "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_week, " + "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_month, " + "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS ytd " + "FROM users u JOIN email_accounts ea ON ea.user_id = u.id " + "JOIN email_account_messages eam ON eam.account_id = ea.id " + "JOIN emails e ON e.id = eam.email_id " + where + + " GROUP BY u.id HAVING ytd > 0 ORDER BY ytd DESC", params)) + return {"columns": ["user", "this_week", "this_month", "ytd"], "rows": rows, + "truncated": False, "summary": f"Outbound email counts for {len(rows)} user(s)."} + + +# ── registry ────────────────────────────────────────────────────────────────────────────── +# key -> {summary, slots, run, example}. `slots` is consumed by the runner's validator and +# (later) surfaced to the local-model translator + the UI as the single source of truth for +# what is queryable. SlotSpec: {type: int|enum|text, ...constraints}. +INTENTS = { + "investors_cold": { + "summary": "Investors we haven't contacted in a while (default 90 days).", + "slots": {"days": {"type": "int", "default": 90, "min": 1, "max": 3650}}, + "example": "Which investors haven't we reached out to in the last 3 months?", + "run": run_investors_cold, + }, + "investor_lookup": { + "summary": "One investor's contacts, email, committed total and per-fund breakdown.", + "slots": {"name": {"type": "text", "required": True, "maxlen": 120}}, + "example": "What is Acme Capital's email and how much have they committed across funds?", + "run": run_investor_lookup, + }, + "investors_by_city": { + "summary": "Investors with a contact located in a given city.", + "slots": {"city": {"type": "text", "required": True, "maxlen": 80}}, + "example": "Who are all the investors located in Austin?", + "run": run_investors_by_city, + }, + "investors_by_lead": { + "summary": "Investors owned by a given lead / team member.", + "slots": {"lead": {"type": "text", "required": True, "maxlen": 80}}, + "example": "Show me the investors led by Jonathan.", + "run": run_investors_by_lead, + }, + "top_investors_committed": { + "summary": "Top investors by total committed capital.", + "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}}, + "example": "List our top 10 investors by committed capital.", + "run": run_top_investors_committed, + }, + "investors_follow_up": { + "summary": "Investors we owe a follow-up to (have an open reminder), overdue first.", + "slots": {}, + "example": "Which investors do we owe follow-ups to?", + "run": run_investors_follow_up, + }, + "pipeline_top": { + "summary": "Top pipeline opportunities by stage, with investor, owner and last activity.", + "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}}, + "example": "List our top 10 pipeline investors by stage and last conversation.", + "run": run_pipeline_top, + }, + "pipeline_totals": { + "summary": "Total pipeline dollars and the split across each stage.", + "slots": {}, + "example": "What is our total pipeline in dollars, split by stage?", + "run": run_pipeline_totals, + }, + "recent_emails": { + "summary": "The most recent investor emails (optionally inbound or outbound only).", + "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": 100}, + "direction": {"type": "enum", "choices": ["any", "inbound", "outbound"], + "default": "any"}}, + "example": "What were the last 10 investor emails we sent, and who to?", + "run": run_recent_emails, + }, + "investor_last_contact": { + "summary": "When we last had any activity with a given investor.", + "slots": {"name": {"type": "text", "required": True, "maxlen": 120}}, + "example": "When did we last reach out to Acme Capital?", + "run": run_investor_last_contact, + }, + "comms_by_user": { + "summary": "Recent investor emails sent by a given team member.", + "slots": {"user": {"type": "text", "required": True, "maxlen": 80}, + "limit": {"type": "int", "default": 10, "min": 1, "max": 100}}, + "example": "What were the last investor emails sent by Grant?", + "run": run_comms_by_user, + }, + "email_counts_by_user": { + "summary": "How many investor emails each user sent this week / month / year-to-date.", + "slots": {"user": {"type": "text", "required": False, "maxlen": 80}}, + "example": "How many emails has Grant sent this week, this month, and year to date?", + "run": run_email_counts_by_user, + }, +} diff --git a/backend/nl_query/runner.py b/backend/nl_query/runner.py new file mode 100644 index 0000000..d76e0ee --- /dev/null +++ b/backend/nl_query/runner.py @@ -0,0 +1,127 @@ +"""NL-query runner — validate a {intent, slots} request, run the curated query, return rows. + +This is the trust boundary. Whatever produced the request (a local model in W2, the web UI, +or a test) is untrusted: the runner accepts ONLY a known intent key and slot VALUES, coerces +each value to its declared type, and rejects anything off-spec — it never lets a caller name +a table/column, write SQL, or choose an operator. The intents do the rest with fixed, +parameterized SQL (see intents.py). All failure modes return a structured error dict; the +runner never raises to the caller (a bad `limit=abc` must not crash the request thread). +""" +import sqlite3 + +from .intents import INTENTS + + +def _coerce_slot(name, spec, raw): + """Coerce/validate one slot value against its spec. Returns (value, error). Exactly one + of the two is meaningful: error is None on success, else a human-readable string.""" + t = spec["type"] + provided = raw is not None and not (isinstance(raw, str) and raw.strip() == "") + + if not provided: + if "default" in spec: + return spec["default"], None + if spec.get("required"): + return None, f"slot '{name}' is required" + return None, None # optional, absent + + if t == "int": + try: + v = int(raw) + except (TypeError, ValueError): + return None, f"slot '{name}' must be an integer (got {raw!r})" + if "min" in spec: + v = max(spec["min"], v) + if "max" in spec: + v = min(spec["max"], v) + return v, None + + if t == "enum": + v = str(raw).strip().lower() + if v not in spec["choices"]: + if "default" in spec: + return spec["default"], None + return None, f"slot '{name}' must be one of {spec['choices']} (got {raw!r})" + return v, None + + if t == "text": + v = str(raw).strip() + maxlen = spec.get("maxlen", 200) + if len(v) > maxlen: + v = v[:maxlen] + return v, None + + return None, f"slot '{name}' has unknown type {t!r}" # registry bug, fail visibly + + +def validate(intent_key, raw_slots): + """Validate an intent + raw slots WITHOUT running. Returns (clean_slots, error_dict). + Useful to the translator/UI for a dry-run check. error_dict is None on success.""" + if intent_key not in INTENTS: + return None, {"error": "unknown_intent", "intent": intent_key, + "detail": f"unknown intent; known: {sorted(INTENTS)}"} + spec = INTENTS[intent_key]["slots"] + raw_slots = raw_slots or {} + # Reject unexpected slot keys rather than ignore them — a request shaped wrong is a + # misunderstanding worth surfacing, not silently dropping. + unexpected = [k for k in raw_slots if k not in spec] + if unexpected: + return None, {"error": "bad_slot", "intent": intent_key, + "detail": f"unexpected slot(s): {unexpected}; allowed: {sorted(spec)}"} + clean = {} + for name, sspec in spec.items(): + v, err = _coerce_slot(name, sspec, raw_slots.get(name)) + if err: + return None, {"error": "bad_slot", "intent": intent_key, "detail": err} + if v is not None or "default" in sspec: + clean[name] = v + return clean, None + + +def run_query(conn, intent_key, raw_slots=None, *, audit_fn=None, actor=None, source="api"): + """Validate and execute a curated NL query. Always returns a dict — either a result + {intent, slots, columns, rows, row_count, truncated, summary} or an error + {error, intent, detail}. Records an audit row via audit_fn (if given) so a query made + through a leaked/automated credential is detectable. + + audit_fn signature: audit_fn({actor, source, intent, slots, row_count, error}). + """ + clean, err = validate(intent_key, raw_slots) + if err: + if audit_fn: + try: + audit_fn({"actor": actor, "source": source, "intent": intent_key, + "slots": raw_slots, "row_count": 0, "error": err["error"]}) + except Exception: + pass + return err + + try: + result = INTENTS[intent_key]["run"](conn, clean) + except sqlite3.Error as exc: + # Surface a query failure (e.g. a missing optional table) as a visible error — never + # swallow it and hand back an empty result that reads as an authoritative "none". + if audit_fn: + try: + audit_fn({"actor": actor, "source": source, "intent": intent_key, + "slots": clean, "row_count": 0, "error": "query_failed"}) + except Exception: + pass + return {"error": "query_failed", "intent": intent_key, "detail": str(exc)} + + out = {"intent": intent_key, "slots": clean, "row_count": len(result.get("rows", [])), + **result} + if audit_fn: + try: + audit_fn({"actor": actor, "source": source, "intent": intent_key, + "slots": clean, "row_count": out["row_count"], "error": None}) + except Exception: + pass + return out + + +def catalog(): + """The queryable surface as data: every intent's key, summary, slot specs and example. + Single source of truth for the W2 translator prompt and any UI hint list.""" + return [{"intent": k, "summary": v["summary"], "slots": v["slots"], + "example": v.get("example", "")} for k, v in INTENTS.items()] diff --git a/backend/nl_query/test_nl_query.py b/backend/nl_query/test_nl_query.py new file mode 100644 index 0000000..1071736 --- /dev/null +++ b/backend/nl_query/test_nl_query.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +"""Tests for the W2 safe NL-query runner (the model-free core). + +Boots the REAL schema (server.init_db against a temp DB — exact columns + all migrations), +inserts synthetic fundraising/email/reminder/pipeline data, and exercises every intent plus +the trust-boundary behaviour: + - each intent returns the right rows over the real schema; + - SOFT-DELETE is respected on both recency legs (a tombstoned communication and a tombstoned + email sighting never count), on reminders, and on opportunities; graveyard investors are + excluded from "live" intents; + - the validator rejects bad/unknown/unexpected slots WITHOUT crashing (the `?limit=abc` class); + - LIKE wildcards in a free-text slot are escaped (a city of "%" does NOT return everything); + - limits clamp to their caps; the audit hook fires with the intent + row count. +Synthetic data only — no real LP substance, no network, no model. + +Run: cd backend && python3 nl_query/test_nl_query.py +""" +import os +import sys +import tempfile +from datetime import datetime, timedelta + +_DATA = tempfile.mkdtemp() +os.environ["CRM_DATA_DIR"] = _DATA +os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") +os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1" + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/ +import server # noqa: E402 +import nl_query # noqa: E402 + +FAILS = [] + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +def _ago(days): + return (datetime.utcnow() - timedelta(days=days)).isoformat() + "Z" + + +TODAY = datetime.utcnow().date() + + +def seed(conn): + c = conn.execute + # users + mailboxes + c("INSERT INTO users (id, username, email, password_hash, full_name, role) VALUES " + "('u_grant','grant','grant@ten31.xyz','x','Grant Smith','admin')," + "('u_jon','jonathan','jon@ten31.xyz','x','Jonathan Lee','member')") + c("INSERT INTO email_accounts (id, user_id, email_address, auth_method) VALUES " + "('a_grant','u_grant','grant@ten31.xyz','dwd')," + "('a_jon','u_jon','jon@ten31.xyz','dwd')") + # funds + c("INSERT INTO fundraising_funds (id, column_id, fund_name, display_order) VALUES " + "('f1','c_f1','Fund I',1),('f2','c_f2','Fund II',2)") + + # investors (graveyard flag is the live/retired axis; no deleted_at on this table) + def inv(iid, name, lead, total, grave=0): + c("INSERT INTO fundraising_investors (id, investor_name, lead, graveyard, " + "source_row_id, total_invested) VALUES (?,?,?,?,?,?)", + (iid, name, lead, grave, iid, total)) + inv("i_acme", "Acme Capital", "Jonathan Lee", 5_000_000) + inv("i_beta", "Beta Partners", "Grant Smith", 2_000_000) + inv("i_cold", "Cold Co", "Grant Smith", 0) # never contacted + inv("i_delta", "Delta LP", "Grant Smith", 1_000_000) # only a (comms) signal + inv("i_ghost", "Graveyard Ghost", "Grant Smith", 9_999_999, grave=1) + + # contacts (grid pills) + classic contact rows for the comms leg + c("INSERT INTO fundraising_contacts (id, investor_id, full_name, email, title, city, " + "contact_id, sort_order) VALUES " + "('fc_a','i_acme','Alice Acme','alice@acme.com','GP','Austin','cc_alice',0)," + "('fc_b','i_beta','Bob Beta','bob@beta.com','LP','Denver',NULL,0)," + "('fc_d','i_delta','Dana Delta','dana@delta.com','CFO','Miami','cc_dana',0)") + c("INSERT INTO contacts (id, first_name, last_name, email) VALUES " + "('cc_alice','Alice','Acme','alice@acme.com')," + "('cc_dana','Dana','Delta','dana@delta.com')") + + # commitments — Acme across two funds (3M + 2M = 5M); Beta one fund + c("INSERT INTO fundraising_commitments (id, investor_id, fund_id, amount) VALUES " + "('cm1','i_acme','f1',3_000_000),('cm2','i_acme','f2',2_000_000)," + "('cm3','i_beta','f1',2_000_000)") + + # emails: matched + a per-mailbox sighting. is_sent + from_email decide direction. + def email(eid, frm, frm_name, days, inv_id, account, is_sent, deleted=False): + c("INSERT INTO emails (id, rfc_message_id, from_email, from_name, sent_at, subject, " + "is_matched, match_status) VALUES (?,?,?,?,?,?,1,'matched')", + (eid, "rfc_" + eid, frm, frm_name, _ago(days), "Re: " + eid)) + c("INSERT INTO email_account_messages (id, email_id, account_id, gmail_message_id, " + "gmail_thread_id, is_sent, deleted_at) VALUES (?,?,?,?,?,?,?)", + ("eam_" + eid, eid, account, "g_" + eid, "t_" + eid, is_sent, + _ago(days) if deleted else None)) + c("INSERT INTO email_investor_links (id, email_id, fundraising_investor_id, " + "matched_address, match_kind) VALUES (?,?,?,?, 'exact_email')", + ("eil_" + eid, eid, inv_id, frm)) + email("ea_recent", "grant@ten31.xyz", "Grant Smith", 0, "i_acme", "a_grant", 1) # Acme: today + email("eb_old", "grant@ten31.xyz", "Grant Smith", 40, "i_beta", "a_grant", 1) # Beta: 40d + email("edel", "grant@ten31.xyz", "Grant Smith", 0, "i_beta", "a_grant", 1, deleted=True) # tombstoned + email("ej", "jon@ten31.xyz", "Jonathan Lee", 0, "i_acme", "a_jon", 1) # jonathan today + email("ein", "alice@acme.com", "Alice Acme", 3, "i_acme", "a_grant", 0) # inbound 3d + + # communications (the other recency leg) — Delta has ONLY comms: one live (5d), one tombstoned + # (today). If the soft-delete filter broke, Delta would read as contacted today. + c("INSERT INTO communications (id, contact_id, type, communication_date, created_by) VALUES " + "('cmm_live','cc_dana','email',?,'u_grant')", (_ago(5),)) + c("INSERT INTO communications (id, contact_id, type, communication_date, created_by, deleted_at) " + "VALUES ('cmm_del','cc_dana','email',?,'u_grant',?)", (_ago(0), _ago(0))) + + # reminders — open(overdue) / open(future) / done / deleted / standalone + def rem(rid, inv_id, title, due, status="open", deleted=False): + c("INSERT INTO reminders (id, investor_id, investor_name, title, due_date, status, " + "deleted_at) VALUES (?,?,?,?,?,?,?)", + (rid, inv_id, title, title, due, status, _ago(0) if deleted else None)) + rem("r_over", "i_beta", "Send deck", (TODAY - timedelta(days=1)).isoformat()) # overdue + rem("r_future", "i_acme", "Quarterly check-in", (TODAY + timedelta(days=10)).isoformat()) + rem("r_done", "i_acme", "Old task", (TODAY - timedelta(days=2)).isoformat(), status="done") + rem("r_del", "i_acme", "Tombstoned", (TODAY - timedelta(days=2)).isoformat(), deleted=True) + rem("r_standalone", None, "Team chore", (TODAY - timedelta(days=1)).isoformat()) + + # opportunities — committed / meeting (live) / lost (terminal) / deleted + def opp(oid, inv_id, contact, stage, expected, owner, deleted=False): + c("INSERT INTO opportunities (id, name, contact_id, stage, expected_amount, owner_id, " + "fundraising_investor_id, deleted_at) VALUES (?,?,?,?,?,?,?,?)", + (oid, oid, contact, stage, expected, owner, inv_id, _ago(0) if deleted else None)) + # opp contact_id must reference a real contacts row (FK on); reuse the two we made + opp("o_acme", "i_acme", "cc_alice", "committed", 4_000_000, "u_jon") + opp("o_beta", "i_beta", "cc_dana", "meeting", 1_000_000, "u_grant") + opp("o_lost", "i_acme", "cc_alice", "lost", 9_000_000, "u_jon") + opp("o_del", "i_beta", "cc_dana", "due_diligence", 7_000_000, "u_grant", deleted=True) + conn.commit() + + +def names(res): + return [r["investor_name"] for r in res["rows"]] + + +def main(): + server.init_db() + conn = server.get_db() + seed(conn) + run = lambda *a, **k: nl_query.run_query(conn, *a, **k) + + print("investors_cold") + r = run("investors_cold", {"days": 30}) + check(names(r) == ["Cold Co", "Beta Partners"], f"cold(30) never-first then stale: {names(r)}") + check(run("investors_cold", {"days": 90})["row_count"] == 1, "cold(90): only never-contacted") + check("Graveyard Ghost" not in names(run("investors_cold", {"days": 3650})), + "cold excludes graveyard investors") + check("Delta LP" in names(run("investors_cold", {"days": 3})), "cold(3) sees Delta (comms 5d)") + check("Delta LP" not in names(run("investors_cold", {"days": 7})), + "cold(7): Delta's tombstoned comm (today) did NOT count") + + print("investor_lookup") + r = run("investor_lookup", {"name": "acme"}) + check(r["row_count"] == 1 and r["rows"][0]["total_invested"] == 5_000_000, "lookup total committed") + check({c["fund_name"] for c in r["rows"][0]["commitments"]} == {"Fund I", "Fund II"}, + "lookup per-fund breakdown") + check(r["rows"][0]["contacts"][0]["email"] == "alice@acme.com", "lookup surfaces contact email") + + print("investors_by_city / by_lead / top / follow_up") + check(names(run("investors_by_city", {"city": "Austin"})) == ["Acme Capital"], "by_city") + check(set(names(run("investors_by_lead", {"lead": "Grant"}))) == {"Beta Partners", "Cold Co", "Delta LP"}, + "by_lead excludes graveyard + other leads") + check(names(run("top_investors_committed", {"limit": 2})) == ["Acme Capital", "Beta Partners"], + "top by committed (graveyard + zero excluded)") + r = run("investors_follow_up") + check(names(r) == ["Beta Partners", "Acme Capital"], f"follow_up overdue-first, open-only: {names(r)}") + check(r["rows"][0]["overdue"] == 1 and r["rows"][1]["overdue"] == 0, "follow_up overdue flag") + + print("pipeline") + r = run("pipeline_totals") + stages = {row["stage"]: row for row in r["rows"]} + check(set(stages) == {"committed", "meeting"}, f"pipeline_totals excludes lost+deleted: {set(stages)}") + check(stages["committed"]["expected_total"] == 4_000_000, "pipeline_totals stage sum") + r = run("pipeline_top", {"limit": 10}) + check(names(r) == ["Acme Capital", "Beta Partners"], "pipeline_top furthest-stage first") + check(r["rows"][0]["last_activity_at"] is not None, "pipeline_top enriches last activity") + + print("emails") + check(run("recent_emails", {"direction": "outbound"})["row_count"] == 3, + "recent_emails(outbound): 3 live (tombstoned sighting excluded)") + check(run("recent_emails", {"direction": "inbound"})["row_count"] == 1, "recent_emails(inbound)") + check(run("recent_emails")["row_count"] == 4, "recent_emails(any): 4 live") + r = run("investor_last_contact", {"name": "beta"}) + check(r["rows"][0]["days_since"] >= 39, "investor_last_contact days_since") + check(run("comms_by_user", {"user": "Grant"})["row_count"] == 2, + "comms_by_user: grant's 2 live outbound (tombstoned excluded)") + r = run("email_counts_by_user", {"user": "grant"}) + check(r["rows"][0]["this_week"] == 1, "email_counts this_week = 1 live (tombstoned excluded)") + check(r["rows"][0]["ytd"] >= 1, "email_counts ytd") + + print("trust boundary") + check(run("investors_cold", {"days": "abc"})["error"] == "bad_slot", "bad int slot -> bad_slot, no crash") + check(run("nope")["error"] == "unknown_intent", "unknown intent rejected") + check(run("pipeline_totals", {"foo": 1})["error"] == "bad_slot", "unexpected slot rejected") + check(run("investor_lookup", {})["error"] == "bad_slot", "missing required slot rejected") + check(run("investors_by_city", {"city": "%"})["row_count"] == 0, + "LIKE wildcard escaped — '%' does not match every row") + check(run("investors_cold", {"days": 0})["slots"]["days"] == 1, "int slot clamps to min") + check(run("top_investors_committed", {"limit": 99999})["slots"]["limit"] == nl_query.INTENTS + ["top_investors_committed"]["slots"]["limit"]["max"], "int slot clamps to max") + + print("audit hook + catalog") + seen = [] + run("pipeline_totals", audit_fn=seen.append, actor="tester", source="test") + check(len(seen) == 1 and seen[0]["intent"] == "pipeline_totals" and seen[0]["error"] is None + and seen[0]["actor"] == "tester", "audit hook fires with intent/actor/no-error") + run("nope", audit_fn=seen.append) + check(seen[-1]["error"] == "unknown_intent", "audit hook fires on rejection too") + check(len(nl_query.catalog()) == len(nl_query.INTENTS), "catalog covers every intent") + + conn.close() + print() + if FAILS: + print(f"{len(FAILS)} FAILED") + for f in FAILS: + print(" - " + f) + sys.exit(1) + print("ALL PASS") + + +if __name__ == "__main__": + main() diff --git a/backend/nl_query/test_nl_query_endpoint.py b/backend/nl_query/test_nl_query_endpoint.py new file mode 100644 index 0000000..5ca7e14 --- /dev/null +++ b/backend/nl_query/test_nl_query_endpoint.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""Endpoint tests for the W2 NL-query HTTP surface (POST /api/query/nl, GET /api/query/catalog). + +Boots the REAL server against a temp DB and exercises the wiring end-to-end: auth gating +(bot/admin only), the direct {intent, slots} mode, the soft-error shape, and the status +mapping. The local model is forced UNAVAILABLE by pointing SPARK_CONTROL_URL at a dead local +port, so the {question} path exercises the 503 path deterministically without any Spark. +Synthetic data only. + +Run: cd backend && python3 nl_query/test_nl_query_endpoint.py +""" +import http.client +import json +import os +import sqlite3 +import sys +import tempfile +import threading +from http.server import ThreadingHTTPServer + +_DATA = tempfile.mkdtemp() +os.environ["CRM_DATA_DIR"] = _DATA +os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") +os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1" +# Dead port -> the local-model leg fails fast, so the {question} path returns 503 deterministically +# (set before server/config import; load_env uses setdefault so this wins over any repo .env). +os.environ["SPARK_CONTROL_URL"] = "http://127.0.0.1:1" + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/ +import server # noqa: E402 +import nl_query # noqa: E402 + +FAILS = [] + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +class _Quiet(server.CRMHandler): + def log_message(self, *a): + pass + + +def _req(port, method, path, token=None, body=None): + conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10) + headers = {} + if token: + headers["Authorization"] = "Bearer " + token + payload = json.dumps(body) if body is not None else None + if payload is not None: + headers["Content-Type"] = "application/json" + conn.request(method, path, body=payload, headers=headers) + resp = conn.getresponse() + raw = resp.read().decode("utf-8", "replace") + conn.close() + data = json.loads(raw) if raw else None + return resp.status, data + + +def _data(d): + return (d or {}).get("data") or {} + + +def main(): + server.init_db() + db = sqlite3.connect(os.environ["CRM_DB_PATH"]) + db.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) VALUES " + "('u_admin','grant','g@t.x','x','Grant','admin',1)," + "('u_mem','mem','m@t.x','x','Mem','member',1)") + db.execute("INSERT INTO fundraising_investors (id,investor_name,lead,graveyard,source_row_id," + "total_invested) VALUES ('a','Acme Capital','Jon',0,'a',5000000)," + "('b','Beta Partners','Grant',0,'b',2000000),('g','Ghost','Grant',1,'g',9000000)") + db.commit() + db.close() + admin = server.create_token("u_admin", "grant", "admin") + member = server.create_token("u_mem", "mem", "member") + + httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet) + port = httpd.server_address[1] + threading.Thread(target=httpd.serve_forever, daemon=True).start() + try: + print("direct {intent, slots} mode") + st, d = _req(port, "POST", "/api/query/nl", admin, + {"intent": "top_investors_committed", "slots": {"limit": 2}}) + rows = _data(d).get("rows", []) + check(st == 200 and [r["investor_name"] for r in rows] == ["Acme Capital", "Beta Partners"], + f"admin direct query -> 200 + rows (got {st})") + check(_data(d).get("intent") == "top_investors_committed", "response echoes interpreted intent") + + print("soft errors + validation") + st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "made_up"}) + check(st == 200 and _data(d).get("error") == "unknown_intent", + f"bad intent -> 200 with data.error=unknown_intent (got {st}, {_data(d).get('error')})") + st, d = _req(port, "POST", "/api/query/nl", admin, {}) + check(st == 400, f"neither question nor intent -> 400 (got {st})") + + print("auth gating") + st, _ = _req(port, "POST", "/api/query/nl", member, + {"intent": "top_investors_committed"}) + check(st == 403, f"member -> 403 (got {st})") + st, _ = _req(port, "POST", "/api/query/nl", None, {"intent": "top_investors_committed"}) + check(st == 401, f"unauthenticated -> 401 (got {st})") + + print("catalog") + st, d = _req(port, "GET", "/api/query/catalog", admin) + check(st == 200 and isinstance(d.get("data"), list) and len(d["data"]) == len(nl_query.INTENTS), + f"catalog -> 200 with every intent (got {st})") + st, _ = _req(port, "GET", "/api/query/catalog", member) + check(st == 403, f"catalog member -> 403 (got {st})") + + print("question path with the local model down") + st, d = _req(port, "POST", "/api/query/nl", admin, + {"question": "who are our top investors by committed capital?"}) + check(st == 503 and _data(d).get("error") == "model_unavailable", + f"question + dead model -> 503 model_unavailable (got {st}, {_data(d).get('error')})") + check(_data(d).get("question"), "question echoed back even on outage") + + print("audit trail") + db = sqlite3.connect(os.environ["CRM_DB_PATH"]) + n = db.execute("SELECT COUNT(*) FROM audit_log WHERE entity_type='nl_query'").fetchone()[0] + db.close() + check(n >= 2, f"executed queries are audited (entity_type=nl_query rows: {n})") + finally: + httpd.shutdown() + + print() + if FAILS: + print(f"{len(FAILS)} FAILED") + for f in FAILS: + print(" - " + f) + sys.exit(1) + print("ALL PASS") + + +if __name__ == "__main__": + main() diff --git a/backend/nl_query/test_translate.py b/backend/nl_query/test_translate.py new file mode 100644 index 0000000..6ca88d6 --- /dev/null +++ b/backend/nl_query/test_translate.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +"""Tests for the W2 NL translator (question -> {intent, slots}) — the local-model leg. + +The model is stubbed via an injected chat_fn, so this runs fully offline (no Spark, no +network). Covers: + - build_system() exposes the whole intent catalog as the model's closed vocabulary; + - translate() returns the parsed {intent, slots} and DROPS slot keys the intent doesn't + declare (model noise), while every surviving value is still validated downstream; + - the translation failure modes: no intent fit -> no_match; unparseable -> no_match; + local model unreachable -> model_unavailable (so the endpoint can 503); + - answer() chains translate + the validated runner end-to-end, and a HALLUCINATED intent + from the model is still rejected by the validator (the model output is never trusted). + +Run: cd backend && python3 nl_query/test_translate.py +""" +import os +import sys +import tempfile + +_DATA = tempfile.mkdtemp() +os.environ["CRM_DATA_DIR"] = _DATA +os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/ +import server # noqa: E402 +import nl_query # noqa: E402 + +T = nl_query # exercise the public API (translate/answer/build_system are re-exported) + +FAILS = [] + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +def main(): + print("build_system") + sysprompt = nl_query.build_system() + check(all(k in sysprompt for k in nl_query.INTENTS), "system prompt lists every intent key") + check("days (integer, default 90)" in sysprompt, "system prompt renders int slot + default") + check("one of any|inbound|outbound" in sysprompt, "system prompt renders enum choices") + + print("translate") + captured = {} + + def fake(prompt, system): + captured["system"] = system + captured["prompt"] = prompt + return {"intent": "investors_cold", "slots": {"days": 90, "bogus": "x"}} + + r = T.translate("who's gone quiet for 3 months?", chat_fn=fake) + check(r == {"intent": "investors_cold", "slots": {"days": 90}}, + f"routes to intent + drops unknown slot 'bogus': {r}") + check(nl_query.INTENTS and "investors_cold" in captured["system"], "chat_fn received the catalog") + check(captured["prompt"] == "who's gone quiet for 3 months?", "chat_fn received the question") + + check(T.translate("x", chat_fn=lambda q, s: {"intent": None})["error"] == "no_match", + "intent null -> no_match") + check(T.translate("x", chat_fn=lambda q, s: None)["error"] == "no_match", + "unparseable model reply -> no_match") + check(T.translate("", chat_fn=lambda q, s: {"intent": "x"})["error"] == "no_match", + "empty question -> no_match (no model call needed)") + + def boom(q, s): + raise RuntimeError("spark down") + + check(T.translate("x", chat_fn=boom)["error"] == "model_unavailable", + "local model unreachable -> model_unavailable") + + print("answer (end-to-end through the validated runner)") + server.init_db() + conn = server.get_db() + conn.execute("INSERT INTO fundraising_investors (id, investor_name, lead, graveyard, " + "source_row_id, total_invested) VALUES " + "('a','Acme Capital','Jon',0,'a',5000000)," + "('b','Beta Partners','Grant',0,'b',2000000)," + "('g','Ghost','Grant',1,'g',9000000)") + conn.commit() + + r = T.answer(conn, "top investors", + chat_fn=lambda q, s: {"intent": "top_investors_committed", "slots": {"limit": 2}}) + check([x["investor_name"] for x in r["rows"]] == ["Acme Capital", "Beta Partners"], + "answer() runs the translated query") + check(r["question"] == "top investors", "answer() echoes the original question") + + r = T.answer(conn, "nonsense", chat_fn=lambda q, s: {"intent": "made_up_intent", "slots": {}}) + check(r.get("error") == "unknown_intent", "hallucinated intent is rejected by the validator") + check(r["question"] == "nonsense", "answer() echoes question on error too") + + r = T.answer(conn, "anything", chat_fn=boom) + check(r.get("error") == "model_unavailable", "answer() surfaces a model outage") + + conn.close() + print() + if FAILS: + print(f"{len(FAILS)} FAILED") + for f in FAILS: + print(" - " + f) + sys.exit(1) + print("ALL PASS") + + +if __name__ == "__main__": + main() diff --git a/backend/nl_query/translate.py b/backend/nl_query/translate.py new file mode 100644 index 0000000..e60bc49 --- /dev/null +++ b/backend/nl_query/translate.py @@ -0,0 +1,108 @@ +"""NL-query translator — plain-English question -> {intent, slots} on the LOCAL model. + +The model's ONLY job is to pick one curated intent and fill its typed slots; it never +touches the database, never sees a row, and never writes SQL. Its output is untrusted and +is handed straight to the runner's validator (runner.validate), which is the trust boundary. + +LOCAL-ONLY BY CONSTRUCTION. Translation runs on the local Qwen via Spark Control +(SPARK_CONTROL_URL), the same sanctioned local leg as intake/digest — so the question never +leaves the box and there is NO Claude path and NO redaction boundary to manage here (that +was the whole point of the W2 simplification: the answer is sensitive and never leaves; the +question is generic English and is translated locally). If the local model ever proves too +weak, a Claude-behind-redaction translator could be slotted in as an alternative `chat_fn` +WITHOUT changing the validator/executor — but it is deliberately not built. + +`chat_fn(prompt, system) -> dict|None` is injectable so the whole translation leg is testable +offline without Spark. The default calls the ingest Spark client (lazy import — it ships in +the Docker image, not the bare CRM). +""" +from .intents import INTENTS +from .runner import run_query + + +def _default_chat_json(prompt, system): + import os + import sys + sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest")) + import llm # noqa: E402 (ingest Spark client; raises if Spark is unreachable) + return llm.chat_json(prompt, system=system, max_tokens=400) + + +def _render_slot(name, spec): + t = spec["type"] + if t == "int": + extra = f", default {spec['default']}" if "default" in spec else "" + return f"{name} (integer{extra})" + if t == "enum": + extra = f", default {spec['default']}" if "default" in spec else "" + return f"{name} (one of {'|'.join(spec['choices'])}{extra})" + req = ", required" if spec.get("required") else ", optional" + return f"{name} (text{req})" + + +def build_system(): + """The system prompt: the full intent catalog as the model's closed vocabulary.""" + lines = [ + "You translate a question about a venture fund's investor database into ONE " + "structured query. Respond with ONLY a JSON object and nothing else:", + ' {"intent": "", "slots": {: }}', + "", + "Rules:", + "- Choose the single best-fitting intent. If none fits, return {\"intent\": null}.", + "- Use ONLY the slot names listed for the chosen intent; omit a slot to accept its default.", + "- Convert natural durations to the integer a slot wants: '3 months'->90, 'a quarter'->90, " + "'6 weeks'->42, 'a year'/'year to date'->365.", + "- Copy names, cities and people verbatim from the question into text slots.", + "- No commentary, no markdown, JSON only.", + "", + "Intents:", + ] + for key, spec in INTENTS.items(): + slots = spec["slots"] + slot_str = "; ".join(_render_slot(n, s) for n, s in slots.items()) or "(none)" + lines.append(f"- {key}: {spec['summary']}") + lines.append(f" slots: {slot_str}") + if spec.get("example"): + lines.append(f" e.g. \"{spec['example']}\"") + return "\n".join(lines) + + +def translate(question, *, chat_fn=None): + """Map a question to {intent, slots} on the local model. Returns that dict, or an error + dict {error, detail}: 'model_unavailable' (local model unreachable -> the endpoint 503s) + or 'no_match' (the model could not map the question to any intent).""" + chat_fn = chat_fn or _default_chat_json + q = (question or "").strip() + if not q: + return {"error": "no_match", "detail": "empty question"} + try: + data = chat_fn(q, build_system()) + except Exception as exc: # connection/runtime failure on the LOCAL model + return {"error": "model_unavailable", "detail": str(exc)} + if not isinstance(data, dict): + return {"error": "no_match", "detail": "model returned no parseable JSON"} + intent = data.get("intent") + if intent in (None, "", "null", "none"): + return {"error": "no_match", "detail": "no intent fit the question"} + slots = data.get("slots") + slots = slots if isinstance(slots, dict) else {} + # Drop slot KEYS the chosen intent doesn't declare — model noise, not a safety concern + # (every surviving VALUE still goes through full type validation in the runner). Unknown + # intents are left as-is so the runner rejects them as unknown_intent. + if intent in INTENTS: + allowed = INTENTS[intent]["slots"] + slots = {k: v for k, v in slots.items() if k in allowed} + return {"intent": intent, "slots": slots} + + +def answer(conn, question, *, chat_fn=None, audit_fn=None, actor=None, source="api"): + """End-to-end: translate a question locally, then run it through the validated runner. + Returns the runner's result (with the interpreted intent/slots, so a human can see how + the question was read) plus the original question, or a translation error dict.""" + t = translate(question, chat_fn=chat_fn) + if t.get("error"): + return {**t, "question": question} + result = run_query(conn, t["intent"], t["slots"], + audit_fn=audit_fn, actor=actor, source=source) + result["question"] = question + return result diff --git a/backend/nl_query/try_questions.py b/backend/nl_query/try_questions.py new file mode 100644 index 0000000..022b817 --- /dev/null +++ b/backend/nl_query/try_questions.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""Dev harness — fire questions at the LOCAL model and print how each is translated. + +Lets you eyeball whether the local Qwen maps real questions to the right curated query +(intent + slots), against your real Spark, with NO UI, auth, HTTP, or deploy. This is the +cheap way to validate translation quality before building the web/Matrix surfaces. It only +translates (it does not touch the DB), so no data is needed and nothing leaves the box. + +NOT shipped and NOT a test (no `test_` prefix) — a developer convenience. + +Needs SPARK_CONTROL_URL set (read from the repo .env) and the Spark reachable. +Run: + python3 backend/nl_query/try_questions.py # the built-in sample set + python3 backend/nl_query/try_questions.py "when did we last email Acme?" +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/ +import nl_query # noqa: E402 + +SAMPLES = [ + "Which investors haven't we reached out to in the last 3 months?", + "Which investors do we owe follow-ups to?", + "What is Acme Capital's email and how much have they committed across funds?", + "When did we last reach out to Acme Capital?", + "What were the last 10 investor emails we sent, and who to?", + "What were the last 10 investor emails we received?", + "Who are all the investors located in Austin?", + "List our top 10 investors by committed capital.", + "List our top 10 pipeline investors by stage and the most recent conversation.", + "What is our total pipeline in dollars, split by stage?", + "What were the last investor emails sent by Grant?", + "How many emails has Jonathan sent this week, this month, and year to date?", +] + + +def main(): + questions = sys.argv[1:] or SAMPLES + print(f"Translating {len(questions)} question(s) on the local model " + f"(SPARK_CONTROL_URL={os.environ.get('SPARK_CONTROL_URL', '(unset)')})\n") + for q in questions: + r = nl_query.translate(q) + if r.get("error"): + print(f" ? {q}\n -> [{r['error']}] {r.get('detail', '')}\n") + else: + print(f" ? {q}\n -> {r['intent']} slots={r['slots']}\n") + + +if __name__ == "__main__": + main() diff --git a/backend/server.py b/backend/server.py index c1719b2..9fead4f 100644 --- a/backend/server.py +++ b/backend/server.py @@ -2180,6 +2180,10 @@ class CRMHandler(BaseHTTPRequestHandler): if path == '/api/reminders': return self.handle_list_reminders(user, params) + # Natural-language query (W2) — the askable catalog + if path == '/api/query/catalog': + return self.handle_nl_query_catalog(user) + # Matrix intake bot — new-vs-existing lookup for its in-thread proposal if path == '/api/intake/match': return self.handle_intake_match(user, params) @@ -2268,6 +2272,8 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_pipeline_unlink(user, body) if path == '/api/reminders': return self.handle_create_reminder(user, body) + if path == '/api/query/nl': + return self.handle_nl_query(user, body) if path == '/api/fundraising/collab/heartbeat': return self.handle_fundraising_collab_heartbeat(user, body) if path == '/api/admin/users': @@ -3613,6 +3619,62 @@ class CRMHandler(BaseHTTPRequestHandler): finally: conn.close() + # ── Natural-language query (W2) — read-only, the safe nl_query core ── + def handle_nl_query(self, user, body): + """Answer a plain-English question about the fundraising database, read-only. + + Bot-or-admin: a query can surface the whole investor book, so it stays off the member + tier; the audit row written below (entity_type='nl_query') makes a query made through a + leaked or automated credential detectable. Accepts either {question} (mapped to an + intent+slots on the LOCAL model — nothing leaves the box) or {intent, slots} (run a + curated query directly, e.g. a UI re-run). BOTH go through the same validator and the + same fixed parameterized SQL in nl_query; result rows never reach any model. + + Status: a local-model outage -> 503; an unexpected SQL fault -> 500; everything else, + including a soft 'no question matched', returns 200 with the structured result, because + the UI always wants the interpreted query + summary back rather than a bare HTTP code.""" + if not require_bot_or_admin(user): + return self.send_error_json("Bot or admin required", 403) + body = body or {} + question = str(body.get('question') or '').strip() + intent = str(body.get('intent') or '').strip() + if not question and not intent: + return self.send_error_json("question or intent is required") + source = (str(body.get('source') or 'api').strip()[:20]) or 'api' + + import nl_query # pure-stdlib at import; the local-model leg is lazy inside translate + conn = get_db() + try: + def _audit(p): + log_audit(conn, user['user_id'], 'nl_query', p.get('intent') or '-', 'query', + {"source": p.get('source'), "slots": p.get('slots'), + "row_count": p.get('row_count'), "error": p.get('error'), + "question": question or None}) + if question: + result = nl_query.answer(conn, question, audit_fn=_audit, + actor=user['user_id'], source=source) + else: + result = nl_query.run_query(conn, intent, body.get('slots') or {}, + audit_fn=_audit, actor=user['user_id'], source=source) + conn.commit() # persist the audit row(s) + finally: + conn.close() + + err = result.get('error') + if err == 'model_unavailable': + return self.send_json({"data": result}, 503) + if err == 'query_failed': + return self.send_json({"data": result}, 500) + return self.send_json({"data": result}) + + def handle_nl_query_catalog(self, user): + """The askable surface: every intent's key, summary, slot specs and an example + question — so the UI can show what can be asked. Same gate as the query endpoint.""" + if not require_bot_or_admin(user): + return self.send_error_json("Bot or admin required", 403) + import nl_query + return self.send_json({"data": nl_query.catalog()}) + def handle_intake_match(self, user, params): """Read-only: does an investor matching this intake already exist? Used by the Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the diff --git a/docs/guides/nl-query.md b/docs/guides/nl-query.md new file mode 100644 index 0000000..7b5505b --- /dev/null +++ b/docs/guides/nl-query.md @@ -0,0 +1,69 @@ +--- +paths: + - backend/nl_query/** +--- + +# Natural-language query (W2) + +Read this before editing the NL-query surface (`backend/nl_query/`). It is the read-only +"ask the database in plain English" layer — web "Ask" box + Matrix `@bot `. + +## The trust model — named intents, not a query language + +There is **no generic SQL/AST compiler and no dynamically-built identifiers.** Every query is +a fixed, hand-written, reviewed, parameterized statement in `intents.py`; the only thing a +caller (or the model) controls is a small set of typed **slot values**, bound as `?` params. +`runner.validate` is the trust boundary: it accepts only a known intent key and coerces each +slot to its declared type, rejecting anything off-spec. A request that's wrong is rejected; +it can never name a table/column, pick an operator, or write SQL. `run_query` never raises — +every failure returns a structured error dict (a bad `limit=abc` must not crash the thread). + +To add a capability: add a `run_*` + a registry entry (with its `slots` spec) in `intents.py`; +the translator prompt and the UI pick it up automatically from `catalog()`. Add a test case. + +## Local-only — no Claude, no redaction here + +Translation (question → `{intent, slots}`) runs on the **local Qwen via Spark Control** +(`translate.py`, reusing `ingest/llm.py`), the same sanctioned local leg as intake/digest. The +question never leaves the box, so there is **no Claude path and no redaction boundary** — that +was the whole point of the W2 simplification (the *answer* is sensitive and never leaves; the +*question* is generic English, translated locally). Validated **12/12** on real example +questions against the live Spark (2026-06-18). The model output is still untrusted: it goes +straight through `runner.validate`, so a hallucinated intent is rejected. If the local model +ever proves too weak, a Claude-behind-redaction translator could drop in as an alternative +`chat_fn` without touching the validator/executor — deliberately **not** built. + +**Results never go to any model.** Summaries are deterministic local strings; rows render +client-side. Never add a "summarize these rows with an LLM" step — that re-introduces the leak. + +## Soft-delete per table (the gotcha the design reviews caught) + +The `fundraising_*` tables are a **hard-rebuilt projection** of the grid blob and have **no +`deleted_at` column** — do NOT add `deleted_at IS NULL` to them (it raises). Their live/retired +axis is the **`graveyard` flag** (exclude `graveyard = 1` for "live"). Other tables: + +- `reminders` / `opportunities` / `communications` → filter `deleted_at IS NULL`. +- `emails` have no `deleted_at`; "live" = a non-tombstoned sighting (`EXISTS email_account_messages … deleted_at IS NULL`), mirroring `query_email_activity` / the digest. + +`intents._last_activity_by_investor` **mirrors** `server.last_activity_by_investor` (duplicated +to avoid importing the `__main__` server module — helpers take a `conn`, never import server). +Keep the two in sync; the soft-delete test guards the copy. + +## Endpoint, caps, audit + +- `POST /api/query/nl` (`require_bot_or_admin`, read-only) — body `{question}` (local translate) + or `{intent, slots}` (direct, e.g. a UI re-run). Returns `{intent, slots, rows, summary, + question}`. `GET /api/query/catalog` returns the askable surface for the UI. +- Status: local-model outage → **503**; unexpected SQL fault → **500**; everything else + (a hit, or a soft `no_match`/`unknown_intent`) → **200** with the structured result, because + the UI always wants the interpreted query back, not a bare code. +- Every executed query writes an audit row (`audit_log`, `entity_type='nl_query'`) so a query + through a leaked/automated credential is detectable. Global row ceiling `MAX_ROWS=500`. + +## Tests + dev harness + +`test_nl_query.py` (runner: every intent + soft-delete on both recency legs + injection-safety ++ caps), `test_translate.py` (offline translator via an injected `chat_fn`), and +`test_nl_query_endpoint.py` (HTTP auth/wiring/503, local model forced down via a dead +`SPARK_CONTROL_URL` port). `try_questions.py` is a dev harness (not a test) that fires +questions at the real local model and prints the translation — the cheap way to check quality.