Files
ten31-database/backend/nl_query/intents.py
T
Keysat 6c29c22601 Add NL-query backend (W2): local translator + safe named-query runner
Read-only "ask the database in plain English" backend. Translation runs on
the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves
the box, no Claude and no redaction boundary (the simplification chosen after
pressure-testing). The safe surface is a curated catalog of ~12 hand-written
parameterized queries; a slot validator is the trust boundary (no generic SQL,
no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated
require_bot_or_admin, read-only, audited. Soft-delete-correct per table.
Local Qwen translated 12/12 real example questions correctly against the live
Spark. Web "Ask" box and Matrix bot still to come (steps 4-5).
2026-06-18 18:35:41 -05:00

434 lines
23 KiB
Python

"""NL-query intents — the curated, hand-written query catalog (W2, the safe core).
Each intent is a FIXED, reviewed, parameterized SQL query with a small set of typed
"slots" (the blanks a question fills in: a number of days, a name, a limit). There is NO
generic SQL/AST compiler and NO dynamically-built identifiers: every table and column name
is hardcoded in the query text, and every value the caller (or an LLM) supplies reaches
SQLite only as a bound `?` parameter. That is the whole trust model — a malformed or
hostile request can change a bound value, never the query structure. Adding a capability
means adding a reviewed entry here, not widening a language.
Soft-delete discipline (CLAUDE.md standing rule), per table:
- reminders / opportunities / communications carry `deleted_at` -> filter `deleted_at IS NULL`.
- emails have NO `deleted_at`; "live" means a non-tombstoned per-mailbox sighting exists
(`email_account_messages.deleted_at IS NULL`) — mirror the digest / query_email_activity.
- fundraising_investors/_contacts/_funds/_commitments are a HARD-REBUILT projection of the
grid blob with NO `deleted_at` column; the live/retired axis there is the `graveyard` flag.
Do NOT add `deleted_at IS NULL` to those tables — the column does not exist and the clause
would raise. Exclude `graveyard = 1` where the question means "live" investors.
Each run_* returns {columns, rows, summary, truncated}. `summary` is a DETERMINISTIC local
one-liner (never an LLM narrative) — results never leave the box to be summarized.
"""
import sqlite3
from datetime import datetime, timedelta
# Generous ceiling — the Matrix review room is two admins and the web app is internal, so
# dumping the full book is acceptable (per Grant); this only guards against an unbounded
# scan flooding a response. A list intent past this is reported truncated, never silently cut.
MAX_ROWS = 500
# Live, non-terminal pipeline stages in funnel order (mirrors server.PIPELINE_STAGES; 'lost'
# is the terminal drop). Kept here so the pipeline intents have a stable rank without importing
# the server module (helpers take a conn; they never import server — house convention).
_STAGE_ORDER = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded']
_STAGE_RANK_SQL = (
"CASE stage WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3 "
"WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6 ELSE 0 END")
# ── helpers ────────────────────────────────────────────────────────────────────────────
def _rows(cur):
"""Materialize a cursor as a list of plain dicts, independent of the connection's
row_factory (works whether rows come back as tuples or sqlite3.Row)."""
cols = [c[0] for c in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
def like_contains(value):
"""Build a safe LIKE pattern for a free-text contains match. Escapes the LIKE
wildcards so a user/LLM value of '%' or '_' is treated literally — paired with
`LIKE ? ESCAPE '\\'` in the SQL, this stops '%' from matching the entire table."""
v = value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
return f"%{v}%"
def _last_activity_by_investor(conn):
"""{fundraising_investors.id: latest activity ISO ts} across logged communications and
captured grid-linked emails — the per-investor recency signal behind the "gone quiet"
and "last contact" intents.
NB: this MIRRORS server.last_activity_by_investor() and its soft-delete joins (comms via
cm.deleted_at IS NULL; email via a live email_account_messages sighting). It is duplicated
rather than imported only to keep this module free of a server import (the main module runs
as __main__, so `import server` would re-execute it). Keep the two in sync; the soft-delete
test guards this copy. If a third caller appears, extract both to a shared module."""
out = {}
def _bump(inv_id, ts):
if inv_id and ts and (out.get(inv_id) is None or str(ts) > str(out[inv_id])):
out[inv_id] = ts
# Each leg is guarded: the comms/email tables can be absent on a minimal DB. This is a
# narrow, intentional tolerance for optional tables — NOT the broad error-swallowing the
# runner forbids (a failure in an intent's main query surfaces as query_failed).
try:
for r in conn.execute(
"SELECT fc.investor_id AS inv, MAX(cm.communication_date) AS last_ts "
"FROM communications cm JOIN fundraising_contacts fc ON fc.contact_id = cm.contact_id "
"WHERE cm.deleted_at IS NULL AND fc.contact_id IS NOT NULL GROUP BY fc.investor_id"):
_bump(r["inv"], r["last_ts"])
except sqlite3.OperationalError:
pass
try:
for r in conn.execute(
"SELECT eil.fundraising_investor_id AS inv, MAX(e.sent_at) AS last_ts "
"FROM email_investor_links eil JOIN emails e ON e.id = eil.email_id "
"WHERE eil.fundraising_investor_id IS NOT NULL AND EXISTS "
"(SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id "
"AND eam.deleted_at IS NULL) GROUP BY eil.fundraising_investor_id"):
_bump(r["inv"], r["last_ts"])
except sqlite3.OperationalError:
pass
return out
def _today():
return datetime.utcnow().date()
def _days_since(ts):
"""Whole days between an ISO date/datetime string and today (UTC). None if unparseable."""
if not ts:
return None
try:
d = datetime.fromisoformat(str(ts)[:10].replace("Z", "")).date()
except ValueError:
return None
return (_today() - d).days
def _own_addresses(conn):
try:
return {(r[0] or "").lower().strip()
for r in conn.execute("SELECT email_address FROM email_accounts")} - {""}
except sqlite3.OperationalError:
return set()
def _truncate(rows):
"""Apply the global ceiling, returning (rows, truncated)."""
if len(rows) > MAX_ROWS:
return rows[:MAX_ROWS], True
return rows, False
# ── investor intents ─────────────────────────────────────────────────────────────────────
def run_investors_cold(conn, slots):
"""Live investors not contacted in `days` days — never-contacted first, then oldest."""
days = slots["days"]
cutoff = (_today() - timedelta(days=days)).isoformat()
last = _last_activity_by_investor(conn)
invs = _rows(conn.execute(
"SELECT id, investor_name, lead, total_invested FROM fundraising_investors "
"WHERE graveyard = 0 ORDER BY investor_name"))
cold = []
for inv in invs:
ts = last.get(inv["id"])
if ts is None or str(ts)[:10] < cutoff:
cold.append({"investor_name": inv["investor_name"], "lead": inv["lead"],
"total_invested": inv["total_invested"],
"last_activity_at": ts, "days_since": _days_since(ts)})
# never-contacted (days_since None) first, then most-stale first
cold.sort(key=lambda r: (r["days_since"] is not None, -(r["days_since"] or 0)))
rows, trunc = _truncate(cold)
return {"columns": ["investor_name", "lead", "total_invested", "last_activity_at", "days_since"],
"rows": rows, "truncated": trunc,
"summary": f"{len(cold)} live investor(s) not contacted in {days}+ days."}
def run_investor_lookup(conn, slots):
"""One investor's profile: contacts (name/email/title/city), committed total, per-fund
commitments, lead. Name matched as a contains (an LLM/user may pass a partial)."""
pat = like_contains(slots["name"])
invs = _rows(conn.execute(
"SELECT id, investor_name, lead, lead_source, total_invested, follow_up, graveyard "
"FROM fundraising_investors WHERE investor_name LIKE ? ESCAPE '\\' "
"ORDER BY graveyard, investor_name LIMIT 25", (pat,)))
for inv in invs:
inv["contacts"] = _rows(conn.execute(
"SELECT full_name, email, title, city, state, country FROM fundraising_contacts "
"WHERE investor_id = ? ORDER BY sort_order, full_name", (inv["id"],)))
inv["commitments"] = _rows(conn.execute(
"SELECT f.fund_name, c.amount FROM fundraising_commitments c "
"JOIN fundraising_funds f ON f.id = c.fund_id WHERE c.investor_id = ? AND c.amount <> 0 "
"ORDER BY f.display_order", (inv["id"],)))
inv.pop("id", None)
return {"columns": ["investor_name", "lead", "lead_source", "total_invested",
"follow_up", "graveyard", "contacts", "commitments"],
"rows": invs, "truncated": False,
"summary": f"{len(invs)} investor(s) matching \"{slots['name']}\"."}
def run_investors_by_city(conn, slots):
"""Investors with a contact located in `city` (contains match on the contact's city)."""
pat = like_contains(slots["city"])
rows = _rows(conn.execute(
"SELECT i.investor_name, c.full_name AS contact, c.city, c.state, c.country, i.lead "
"FROM fundraising_contacts c JOIN fundraising_investors i ON i.id = c.investor_id "
"WHERE i.graveyard = 0 AND c.city LIKE ? ESCAPE '\\' "
"ORDER BY i.investor_name, c.full_name LIMIT ?", (pat, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "contact", "city", "state", "country", "lead"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} investor contact(s) in \"{slots['city']}\"."}
def run_investors_by_lead(conn, slots):
"""Live investors owned by a given lead/team member (contains match on `lead`)."""
pat = like_contains(slots["lead"])
rows = _rows(conn.execute(
"SELECT investor_name, lead, total_invested, follow_up FROM fundraising_investors "
"WHERE graveyard = 0 AND lead LIKE ? ESCAPE '\\' "
"ORDER BY total_invested DESC, investor_name LIMIT ?", (pat, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "lead", "total_invested", "follow_up"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} live investor(s) led by \"{slots['lead']}\"."}
def run_top_investors_committed(conn, slots):
"""Top `limit` live investors by total committed capital across all funds."""
n = slots["limit"]
rows = _rows(conn.execute(
"SELECT investor_name, total_invested, lead FROM fundraising_investors "
"WHERE graveyard = 0 AND total_invested > 0 "
"ORDER BY total_invested DESC, investor_name LIMIT ?", (n,)))
return {"columns": ["investor_name", "total_invested", "lead"], "rows": rows,
"truncated": False, "summary": f"Top {len(rows)} investor(s) by committed capital."}
def run_investors_follow_up(conn, slots):
"""Investors we owe a follow-up to: those with an OPEN reminder, overdue first. Uses the
W1 reminders table (the richer follow-up layer) joined to the current grid name."""
today = _today().isoformat()
rows = _rows(conn.execute(
"SELECT COALESCE(i.investor_name, r.investor_name) AS investor_name, r.title, "
"r.due_date, r.status, r.assignee_id, "
"CASE WHEN r.due_date IS NOT NULL AND substr(r.due_date,1,10) < ? THEN 1 ELSE 0 END AS overdue "
"FROM reminders r LEFT JOIN fundraising_investors i ON i.id = r.investor_id "
"WHERE r.deleted_at IS NULL AND r.status = 'open' AND r.investor_id IS NOT NULL "
"ORDER BY (r.due_date IS NULL), r.due_date ASC LIMIT ?", (today, MAX_ROWS + 1)))
rows, trunc = _truncate(rows)
return {"columns": ["investor_name", "title", "due_date", "status", "overdue"],
"rows": rows, "truncated": trunc,
"summary": f"{len(rows)} investor(s) with an open follow-up reminder."}
# ── pipeline intents ──────────────────────────────────────────────────────────────────────
def run_pipeline_top(conn, slots):
"""Top `limit` live pipeline opportunities by stage (furthest along first), with the
investor, owner, and most-recent activity."""
n = slots["limit"]
last = _last_activity_by_investor(conn)
rows = _rows(conn.execute(
"SELECT o.fundraising_investor_id AS inv_id, "
"COALESCE(i.investor_name, o.name) AS investor_name, o.stage, o.expected_amount, "
"o.probability, u.full_name AS owner FROM opportunities o "
"LEFT JOIN fundraising_investors i ON i.id = o.fundraising_investor_id "
"LEFT JOIN users u ON u.id = o.owner_id "
"WHERE o.deleted_at IS NULL AND o.stage != 'lost' "
f"ORDER BY {_STAGE_RANK_SQL} DESC, o.expected_amount DESC LIMIT ?", (n,)))
for r in rows:
r["last_activity_at"] = last.get(r.pop("inv_id"))
return {"columns": ["investor_name", "stage", "expected_amount", "probability", "owner",
"last_activity_at"],
"rows": rows, "truncated": False,
"summary": f"Top {len(rows)} live pipeline opportunit(ies) by stage."}
def run_pipeline_totals(conn, slots):
"""Total pipeline dollars and the split across each stage (excludes lost)."""
rows = _rows(conn.execute(
"SELECT stage, COUNT(*) AS count, COALESCE(SUM(expected_amount),0) AS expected_total, "
"COALESCE(SUM(commitment_amount),0) AS committed_total FROM opportunities "
f"WHERE deleted_at IS NULL AND stage != 'lost' GROUP BY stage ORDER BY {_STAGE_RANK_SQL}"))
total = sum(r["expected_total"] for r in rows)
count = sum(r["count"] for r in rows)
return {"columns": ["stage", "count", "expected_total", "committed_total"],
"rows": rows, "truncated": False,
"summary": f"${total:,.0f} expected across {count} live opportunit(ies) in "
f"{len(rows)} stage(s)."}
# ── email / communication intents ─────────────────────────────────────────────────────────
def run_recent_emails(conn, slots):
"""The most recent `limit` matched investor emails, optionally one direction.
Matched-only + soft-delete-correct (a live email_account_messages sighting), mirroring
the Communications panel's query_email_activity."""
n, direction = slots["limit"], slots["direction"]
where = ["EXISTS (SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id "
"AND eam.deleted_at IS NULL)",
"EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"]
params = []
own = _own_addresses(conn)
if direction in ("inbound", "outbound") and own:
op = "IN" if direction == "outbound" else "NOT IN"
where.append(f"LOWER(e.from_email) {op} ({','.join('?' for _ in own)})")
params.extend(sorted(own))
sql = ("SELECT e.subject, e.from_name, e.from_email, e.sent_at, "
"(SELECT fi.investor_name FROM email_investor_links l "
" JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id "
" WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor "
"FROM emails e WHERE " + " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?")
rows = _rows(conn.execute(sql, params + [n]))
label = {"inbound": "received", "outbound": "sent"}.get(direction, "")
return {"columns": ["sent_at", "subject", "from_name", "from_email", "investor"],
"rows": rows, "truncated": False,
"summary": f"{len(rows)} most-recent {label + ' ' if label else ''}investor email(s)."}
def run_investor_last_contact(conn, slots):
"""When we last had any activity with investor X (matched by name)."""
pat = like_contains(slots["name"])
last = _last_activity_by_investor(conn)
invs = _rows(conn.execute(
"SELECT id, investor_name FROM fundraising_investors "
"WHERE investor_name LIKE ? ESCAPE '\\' ORDER BY graveyard, investor_name LIMIT 25", (pat,)))
rows = []
for inv in invs:
ts = last.get(inv["id"])
rows.append({"investor_name": inv["investor_name"], "last_activity_at": ts,
"days_since": _days_since(ts)})
return {"columns": ["investor_name", "last_activity_at", "days_since"], "rows": rows,
"truncated": False, "summary": f"Last contact for {len(rows)} investor(s) "
f"matching \"{slots['name']}\"."}
def run_comms_by_user(conn, slots):
"""The most recent `limit` outbound investor emails sent by a given user (matched by
username or full name). Soft-delete-correct (live sighting, is_sent)."""
n, pat = slots["limit"], like_contains(slots["user"])
rows = _rows(conn.execute(
"SELECT e.subject, e.sent_at, u.full_name AS sender, "
"(SELECT fi.investor_name FROM email_investor_links l "
" JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id "
" WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor "
"FROM emails e JOIN email_account_messages eam ON eam.email_id = e.id "
"AND eam.deleted_at IS NULL AND eam.is_sent = 1 "
"JOIN email_accounts ea ON ea.id = eam.account_id JOIN users u ON u.id = ea.user_id "
"WHERE (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\') "
"ORDER BY e.sent_at DESC LIMIT ?", (pat, pat, n)))
return {"columns": ["sent_at", "subject", "sender", "investor"], "rows": rows,
"truncated": False,
"summary": f"{len(rows)} recent email(s) sent by \"{slots['user']}\"."}
def run_email_counts_by_user(conn, slots):
"""Per-user counts of outbound investor emails over this week / month / year-to-date.
Windows are calendar-based: week = since Monday, month = since the 1st, ytd = since Jan 1."""
today = _today()
wk = (today - timedelta(days=today.weekday())).isoformat()
mo = today.replace(day=1).isoformat()
yr = today.replace(month=1, day=1).isoformat()
where = "WHERE eam.deleted_at IS NULL AND eam.is_sent = 1"
params = [wk, mo, yr]
if slots.get("user"):
pat = like_contains(slots["user"])
where += " AND (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\')"
params.extend([pat, pat])
rows = _rows(conn.execute(
"SELECT u.full_name AS user, u.username, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_week, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_month, "
"SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS ytd "
"FROM users u JOIN email_accounts ea ON ea.user_id = u.id "
"JOIN email_account_messages eam ON eam.account_id = ea.id "
"JOIN emails e ON e.id = eam.email_id " + where +
" GROUP BY u.id HAVING ytd > 0 ORDER BY ytd DESC", params))
return {"columns": ["user", "this_week", "this_month", "ytd"], "rows": rows,
"truncated": False, "summary": f"Outbound email counts for {len(rows)} user(s)."}
# ── registry ──────────────────────────────────────────────────────────────────────────────
# key -> {summary, slots, run, example}. `slots` is consumed by the runner's validator and
# (later) surfaced to the local-model translator + the UI as the single source of truth for
# what is queryable. SlotSpec: {type: int|enum|text, ...constraints}.
INTENTS = {
"investors_cold": {
"summary": "Investors we haven't contacted in a while (default 90 days).",
"slots": {"days": {"type": "int", "default": 90, "min": 1, "max": 3650}},
"example": "Which investors haven't we reached out to in the last 3 months?",
"run": run_investors_cold,
},
"investor_lookup": {
"summary": "One investor's contacts, email, committed total and per-fund breakdown.",
"slots": {"name": {"type": "text", "required": True, "maxlen": 120}},
"example": "What is Acme Capital's email and how much have they committed across funds?",
"run": run_investor_lookup,
},
"investors_by_city": {
"summary": "Investors with a contact located in a given city.",
"slots": {"city": {"type": "text", "required": True, "maxlen": 80}},
"example": "Who are all the investors located in Austin?",
"run": run_investors_by_city,
},
"investors_by_lead": {
"summary": "Investors owned by a given lead / team member.",
"slots": {"lead": {"type": "text", "required": True, "maxlen": 80}},
"example": "Show me the investors led by Jonathan.",
"run": run_investors_by_lead,
},
"top_investors_committed": {
"summary": "Top investors by total committed capital.",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}},
"example": "List our top 10 investors by committed capital.",
"run": run_top_investors_committed,
},
"investors_follow_up": {
"summary": "Investors we owe a follow-up to (have an open reminder), overdue first.",
"slots": {},
"example": "Which investors do we owe follow-ups to?",
"run": run_investors_follow_up,
},
"pipeline_top": {
"summary": "Top pipeline opportunities by stage, with investor, owner and last activity.",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}},
"example": "List our top 10 pipeline investors by stage and last conversation.",
"run": run_pipeline_top,
},
"pipeline_totals": {
"summary": "Total pipeline dollars and the split across each stage.",
"slots": {},
"example": "What is our total pipeline in dollars, split by stage?",
"run": run_pipeline_totals,
},
"recent_emails": {
"summary": "The most recent investor emails (optionally inbound or outbound only).",
"slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": 100},
"direction": {"type": "enum", "choices": ["any", "inbound", "outbound"],
"default": "any"}},
"example": "What were the last 10 investor emails we sent, and who to?",
"run": run_recent_emails,
},
"investor_last_contact": {
"summary": "When we last had any activity with a given investor.",
"slots": {"name": {"type": "text", "required": True, "maxlen": 120}},
"example": "When did we last reach out to Acme Capital?",
"run": run_investor_last_contact,
},
"comms_by_user": {
"summary": "Recent investor emails sent by a given team member.",
"slots": {"user": {"type": "text", "required": True, "maxlen": 80},
"limit": {"type": "int", "default": 10, "min": 1, "max": 100}},
"example": "What were the last investor emails sent by Grant?",
"run": run_comms_by_user,
},
"email_counts_by_user": {
"summary": "How many investor emails each user sent this week / month / year-to-date.",
"slots": {"user": {"type": "text", "required": False, "maxlen": 80}},
"example": "How many emails has Grant sent this week, this month, and year to date?",
"run": run_email_counts_by_user,
},
}