"""NL-query intents — the curated, hand-written query catalog (W2, the safe core). Each intent is a FIXED, reviewed, parameterized SQL query with a small set of typed "slots" (the blanks a question fills in: a number of days, a name, a limit). There is NO generic SQL/AST compiler and NO dynamically-built identifiers: every table and column name is hardcoded in the query text, and every value the caller (or an LLM) supplies reaches SQLite only as a bound `?` parameter. That is the whole trust model — a malformed or hostile request can change a bound value, never the query structure. Adding a capability means adding a reviewed entry here, not widening a language. Soft-delete discipline (CLAUDE.md standing rule), per table: - reminders / opportunities / communications carry `deleted_at` -> filter `deleted_at IS NULL`. - emails have NO `deleted_at`; "live" means a non-tombstoned per-mailbox sighting exists (`email_account_messages.deleted_at IS NULL`) — mirror the digest / query_email_activity. - fundraising_investors/_contacts/_funds/_commitments are a HARD-REBUILT projection of the grid blob with NO `deleted_at` column; the live/retired axis there is the `graveyard` flag. Do NOT add `deleted_at IS NULL` to those tables — the column does not exist and the clause would raise. Exclude `graveyard = 1` where the question means "live" investors. Each run_* returns {columns, rows, summary, truncated}. `summary` is a DETERMINISTIC local one-liner (never an LLM narrative) — results never leave the box to be summarized. """ import sqlite3 from datetime import datetime, timedelta # Generous ceiling — the Matrix review room is two admins and the web app is internal, so # dumping the full book is acceptable (per Grant); this only guards against an unbounded # scan flooding a response. A list intent past this is reported truncated, never silently cut. MAX_ROWS = 500 # Live, non-terminal pipeline stages in funnel order (mirrors server.PIPELINE_STAGES; 'lost' # is the terminal drop). Kept here so the pipeline intents have a stable rank without importing # the server module (helpers take a conn; they never import server — house convention). _STAGE_ORDER = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded'] _STAGE_RANK_SQL = ( "CASE stage WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3 " "WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6 ELSE 0 END") # ── helpers ──────────────────────────────────────────────────────────────────────────── def _rows(cur): """Materialize a cursor as a list of plain dicts, independent of the connection's row_factory (works whether rows come back as tuples or sqlite3.Row).""" cols = [c[0] for c in cur.description] return [dict(zip(cols, r)) for r in cur.fetchall()] def like_contains(value): """Build a safe LIKE pattern for a free-text contains match. Escapes the LIKE wildcards so a user/LLM value of '%' or '_' is treated literally — paired with `LIKE ? ESCAPE '\\'` in the SQL, this stops '%' from matching the entire table.""" v = value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") return f"%{v}%" def _last_activity_by_investor(conn): """{fundraising_investors.id: latest activity ISO ts} across logged communications and captured grid-linked emails — the per-investor recency signal behind the "gone quiet" and "last contact" intents. NB: this MIRRORS server.last_activity_by_investor() and its soft-delete joins (comms via cm.deleted_at IS NULL; email via a live email_account_messages sighting). It is duplicated rather than imported only to keep this module free of a server import (the main module runs as __main__, so `import server` would re-execute it). Keep the two in sync; the soft-delete test guards this copy. If a third caller appears, extract both to a shared module.""" out = {} def _bump(inv_id, ts): if inv_id and ts and (out.get(inv_id) is None or str(ts) > str(out[inv_id])): out[inv_id] = ts # Each leg is guarded: the comms/email tables can be absent on a minimal DB. This is a # narrow, intentional tolerance for optional tables — NOT the broad error-swallowing the # runner forbids (a failure in an intent's main query surfaces as query_failed). try: for r in conn.execute( "SELECT fc.investor_id AS inv, MAX(cm.communication_date) AS last_ts " "FROM communications cm JOIN fundraising_contacts fc ON fc.contact_id = cm.contact_id " "WHERE cm.deleted_at IS NULL AND fc.contact_id IS NOT NULL GROUP BY fc.investor_id"): _bump(r["inv"], r["last_ts"]) except sqlite3.OperationalError: pass try: for r in conn.execute( "SELECT eil.fundraising_investor_id AS inv, MAX(e.sent_at) AS last_ts " "FROM email_investor_links eil JOIN emails e ON e.id = eil.email_id " "WHERE eil.fundraising_investor_id IS NOT NULL AND EXISTS " "(SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id " "AND eam.deleted_at IS NULL) GROUP BY eil.fundraising_investor_id"): _bump(r["inv"], r["last_ts"]) except sqlite3.OperationalError: pass return out def _today(): return datetime.utcnow().date() def _days_since(ts): """Whole days between an ISO date/datetime string and today (UTC). None if unparseable.""" if not ts: return None try: d = datetime.fromisoformat(str(ts)[:10].replace("Z", "")).date() except ValueError: return None return (_today() - d).days def _own_addresses(conn): try: return {(r[0] or "").lower().strip() for r in conn.execute("SELECT email_address FROM email_accounts")} - {""} except sqlite3.OperationalError: return set() def _truncate(rows): """Apply the global ceiling, returning (rows, truncated).""" if len(rows) > MAX_ROWS: return rows[:MAX_ROWS], True return rows, False # ── investor intents ───────────────────────────────────────────────────────────────────── def run_investors_cold(conn, slots): """Live investors not contacted in `days` days — never-contacted first, then oldest.""" days = slots["days"] cutoff = (_today() - timedelta(days=days)).isoformat() last = _last_activity_by_investor(conn) invs = _rows(conn.execute( "SELECT id, investor_name, lead, total_invested FROM fundraising_investors " "WHERE graveyard = 0 ORDER BY investor_name")) cold = [] for inv in invs: ts = last.get(inv["id"]) if ts is None or str(ts)[:10] < cutoff: cold.append({"investor_name": inv["investor_name"], "lead": inv["lead"], "total_invested": inv["total_invested"], "last_activity_at": ts, "days_since": _days_since(ts)}) # never-contacted (days_since None) first, then most-stale first cold.sort(key=lambda r: (r["days_since"] is not None, -(r["days_since"] or 0))) rows, trunc = _truncate(cold) return {"columns": ["investor_name", "lead", "total_invested", "last_activity_at", "days_since"], "rows": rows, "truncated": trunc, "summary": f"{len(cold)} live investor(s) not contacted in {days}+ days."} def run_investor_lookup(conn, slots): """One investor's profile: contacts (name/email/title/city), committed total, per-fund commitments, lead. Name matched as a contains (an LLM/user may pass a partial).""" pat = like_contains(slots["name"]) invs = _rows(conn.execute( "SELECT id, investor_name, lead, lead_source, total_invested, follow_up, graveyard " "FROM fundraising_investors WHERE investor_name LIKE ? ESCAPE '\\' " "ORDER BY graveyard, investor_name LIMIT 25", (pat,))) for inv in invs: inv["contacts"] = _rows(conn.execute( "SELECT full_name, email, title, city, state, country FROM fundraising_contacts " "WHERE investor_id = ? ORDER BY sort_order, full_name", (inv["id"],))) inv["commitments"] = _rows(conn.execute( "SELECT f.fund_name, c.amount FROM fundraising_commitments c " "JOIN fundraising_funds f ON f.id = c.fund_id WHERE c.investor_id = ? AND c.amount <> 0 " "ORDER BY f.display_order", (inv["id"],))) inv.pop("id", None) return {"columns": ["investor_name", "lead", "lead_source", "total_invested", "follow_up", "graveyard", "contacts", "commitments"], "rows": invs, "truncated": False, "summary": f"{len(invs)} investor(s) matching \"{slots['name']}\"."} def run_investors_by_city(conn, slots): """Investors with a contact located in `city` (contains match on the contact's city).""" pat = like_contains(slots["city"]) rows = _rows(conn.execute( "SELECT i.investor_name, c.full_name AS contact, c.city, c.state, c.country, i.lead " "FROM fundraising_contacts c JOIN fundraising_investors i ON i.id = c.investor_id " "WHERE i.graveyard = 0 AND c.city LIKE ? ESCAPE '\\' " "ORDER BY i.investor_name, c.full_name LIMIT ?", (pat, MAX_ROWS + 1))) rows, trunc = _truncate(rows) return {"columns": ["investor_name", "contact", "city", "state", "country", "lead"], "rows": rows, "truncated": trunc, "summary": f"{len(rows)} investor contact(s) in \"{slots['city']}\"."} def run_investors_by_lead(conn, slots): """Live investors owned by a given lead/team member (contains match on `lead`).""" pat = like_contains(slots["lead"]) rows = _rows(conn.execute( "SELECT investor_name, lead, total_invested, follow_up FROM fundraising_investors " "WHERE graveyard = 0 AND lead LIKE ? ESCAPE '\\' " "ORDER BY total_invested DESC, investor_name LIMIT ?", (pat, MAX_ROWS + 1))) rows, trunc = _truncate(rows) return {"columns": ["investor_name", "lead", "total_invested", "follow_up"], "rows": rows, "truncated": trunc, "summary": f"{len(rows)} live investor(s) led by \"{slots['lead']}\"."} def run_top_investors_committed(conn, slots): """Top `limit` live investors by total committed capital across all funds.""" n = slots["limit"] rows = _rows(conn.execute( "SELECT investor_name, total_invested, lead FROM fundraising_investors " "WHERE graveyard = 0 AND total_invested > 0 " "ORDER BY total_invested DESC, investor_name LIMIT ?", (n,))) return {"columns": ["investor_name", "total_invested", "lead"], "rows": rows, "truncated": False, "summary": f"Top {len(rows)} investor(s) by committed capital."} def run_investors_follow_up(conn, slots): """Investors we owe a follow-up to: those with an OPEN reminder, overdue first. Uses the W1 reminders table (the richer follow-up layer) joined to the current grid name.""" today = _today().isoformat() rows = _rows(conn.execute( "SELECT COALESCE(i.investor_name, r.investor_name) AS investor_name, r.title, " "r.due_date, r.status, r.assignee_id, " "CASE WHEN r.due_date IS NOT NULL AND substr(r.due_date,1,10) < ? THEN 1 ELSE 0 END AS overdue " "FROM reminders r LEFT JOIN fundraising_investors i ON i.id = r.investor_id " "WHERE r.deleted_at IS NULL AND r.status = 'open' AND r.investor_id IS NOT NULL " "ORDER BY (r.due_date IS NULL), r.due_date ASC LIMIT ?", (today, MAX_ROWS + 1))) rows, trunc = _truncate(rows) return {"columns": ["investor_name", "title", "due_date", "status", "overdue"], "rows": rows, "truncated": trunc, "summary": f"{len(rows)} investor(s) with an open follow-up reminder."} # ── pipeline intents ────────────────────────────────────────────────────────────────────── def run_pipeline_top(conn, slots): """Top `limit` live pipeline opportunities by stage (furthest along first), with the investor, owner, and most-recent activity.""" n = slots["limit"] last = _last_activity_by_investor(conn) rows = _rows(conn.execute( "SELECT o.fundraising_investor_id AS inv_id, " "COALESCE(i.investor_name, o.name) AS investor_name, o.stage, o.expected_amount, " "o.probability, u.full_name AS owner FROM opportunities o " "LEFT JOIN fundraising_investors i ON i.id = o.fundraising_investor_id " "LEFT JOIN users u ON u.id = o.owner_id " "WHERE o.deleted_at IS NULL AND o.stage != 'lost' " f"ORDER BY {_STAGE_RANK_SQL} DESC, o.expected_amount DESC LIMIT ?", (n,))) for r in rows: r["last_activity_at"] = last.get(r.pop("inv_id")) return {"columns": ["investor_name", "stage", "expected_amount", "probability", "owner", "last_activity_at"], "rows": rows, "truncated": False, "summary": f"Top {len(rows)} live pipeline opportunit(ies) by stage."} def run_pipeline_totals(conn, slots): """Total pipeline dollars and the split across each stage (excludes lost).""" rows = _rows(conn.execute( "SELECT stage, COUNT(*) AS count, COALESCE(SUM(expected_amount),0) AS expected_total, " "COALESCE(SUM(commitment_amount),0) AS committed_total FROM opportunities " f"WHERE deleted_at IS NULL AND stage != 'lost' GROUP BY stage ORDER BY {_STAGE_RANK_SQL}")) total = sum(r["expected_total"] for r in rows) count = sum(r["count"] for r in rows) return {"columns": ["stage", "count", "expected_total", "committed_total"], "rows": rows, "truncated": False, "summary": f"${total:,.0f} expected across {count} live opportunit(ies) in " f"{len(rows)} stage(s)."} # ── email / communication intents ───────────────────────────────────────────────────────── def run_recent_emails(conn, slots): """The most recent `limit` matched investor emails, optionally one direction. Matched-only + soft-delete-correct (a live email_account_messages sighting), mirroring the Communications panel's query_email_activity.""" n, direction = slots["limit"], slots["direction"] where = ["EXISTS (SELECT 1 FROM email_account_messages eam WHERE eam.email_id = e.id " "AND eam.deleted_at IS NULL)", "EXISTS (SELECT 1 FROM email_investor_links l WHERE l.email_id = e.id)"] params = [] own = _own_addresses(conn) if direction in ("inbound", "outbound") and own: op = "IN" if direction == "outbound" else "NOT IN" where.append(f"LOWER(e.from_email) {op} ({','.join('?' for _ in own)})") params.extend(sorted(own)) sql = ("SELECT e.subject, e.from_name, e.from_email, e.sent_at, " "(SELECT fi.investor_name FROM email_investor_links l " " JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id " " WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor " "FROM emails e WHERE " + " AND ".join(where) + " ORDER BY e.sent_at DESC LIMIT ?") rows = _rows(conn.execute(sql, params + [n])) label = {"inbound": "received", "outbound": "sent"}.get(direction, "") return {"columns": ["sent_at", "subject", "from_name", "from_email", "investor"], "rows": rows, "truncated": False, "summary": f"{len(rows)} most-recent {label + ' ' if label else ''}investor email(s)."} def run_investor_last_contact(conn, slots): """When we last had any activity with investor X (matched by name).""" pat = like_contains(slots["name"]) last = _last_activity_by_investor(conn) invs = _rows(conn.execute( "SELECT id, investor_name FROM fundraising_investors " "WHERE investor_name LIKE ? ESCAPE '\\' ORDER BY graveyard, investor_name LIMIT 25", (pat,))) rows = [] for inv in invs: ts = last.get(inv["id"]) rows.append({"investor_name": inv["investor_name"], "last_activity_at": ts, "days_since": _days_since(ts)}) return {"columns": ["investor_name", "last_activity_at", "days_since"], "rows": rows, "truncated": False, "summary": f"Last contact for {len(rows)} investor(s) " f"matching \"{slots['name']}\"."} def run_comms_by_user(conn, slots): """The most recent `limit` outbound investor emails sent by a given user (matched by username or full name). Soft-delete-correct (live sighting, is_sent).""" n, pat = slots["limit"], like_contains(slots["user"]) rows = _rows(conn.execute( "SELECT e.subject, e.sent_at, u.full_name AS sender, " "(SELECT fi.investor_name FROM email_investor_links l " " JOIN fundraising_investors fi ON fi.id = l.fundraising_investor_id " " WHERE l.email_id = e.id AND l.fundraising_investor_id IS NOT NULL LIMIT 1) AS investor " "FROM emails e JOIN email_account_messages eam ON eam.email_id = e.id " "AND eam.deleted_at IS NULL AND eam.is_sent = 1 " "JOIN email_accounts ea ON ea.id = eam.account_id JOIN users u ON u.id = ea.user_id " "WHERE (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\') " "ORDER BY e.sent_at DESC LIMIT ?", (pat, pat, n))) return {"columns": ["sent_at", "subject", "sender", "investor"], "rows": rows, "truncated": False, "summary": f"{len(rows)} recent email(s) sent by \"{slots['user']}\"."} def run_email_counts_by_user(conn, slots): """Per-user counts of outbound investor emails over this week / month / year-to-date. Windows are calendar-based: week = since Monday, month = since the 1st, ytd = since Jan 1.""" today = _today() wk = (today - timedelta(days=today.weekday())).isoformat() mo = today.replace(day=1).isoformat() yr = today.replace(month=1, day=1).isoformat() where = "WHERE eam.deleted_at IS NULL AND eam.is_sent = 1" params = [wk, mo, yr] if slots.get("user"): pat = like_contains(slots["user"]) where += " AND (u.username LIKE ? ESCAPE '\\' OR u.full_name LIKE ? ESCAPE '\\')" params.extend([pat, pat]) rows = _rows(conn.execute( "SELECT u.full_name AS user, u.username, " "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_week, " "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS this_month, " "SUM(CASE WHEN substr(e.sent_at,1,10) >= ? THEN 1 ELSE 0 END) AS ytd " "FROM users u JOIN email_accounts ea ON ea.user_id = u.id " "JOIN email_account_messages eam ON eam.account_id = ea.id " "JOIN emails e ON e.id = eam.email_id " + where + " GROUP BY u.id HAVING ytd > 0 ORDER BY ytd DESC", params)) return {"columns": ["user", "this_week", "this_month", "ytd"], "rows": rows, "truncated": False, "summary": f"Outbound email counts for {len(rows)} user(s)."} # ── registry ────────────────────────────────────────────────────────────────────────────── # key -> {summary, slots, run, example}. `slots` is consumed by the runner's validator and # (later) surfaced to the local-model translator + the UI as the single source of truth for # what is queryable. SlotSpec: {type: int|enum|text, ...constraints}. INTENTS = { "investors_cold": { "summary": "Investors we haven't contacted in a while (default 90 days).", "slots": {"days": {"type": "int", "default": 90, "min": 1, "max": 3650}}, "example": "Which investors haven't we reached out to in the last 3 months?", "run": run_investors_cold, }, "investor_lookup": { "summary": "One investor's contacts, email, committed total and per-fund breakdown.", "slots": {"name": {"type": "text", "required": True, "maxlen": 120}}, "example": "What is Acme Capital's email and how much have they committed across funds?", "run": run_investor_lookup, }, "investors_by_city": { "summary": "Investors with a contact located in a given city.", "slots": {"city": {"type": "text", "required": True, "maxlen": 80}}, "example": "Who are all the investors located in Austin?", "run": run_investors_by_city, }, "investors_by_lead": { "summary": "Investors owned by a given lead / team member.", "slots": {"lead": {"type": "text", "required": True, "maxlen": 80}}, "example": "Show me the investors led by Jonathan.", "run": run_investors_by_lead, }, "top_investors_committed": { "summary": "Top investors by total committed capital.", "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}}, "example": "List our top 10 investors by committed capital.", "run": run_top_investors_committed, }, "investors_follow_up": { "summary": "Investors we owe a follow-up to (have an open reminder), overdue first.", "slots": {}, "example": "Which investors do we owe follow-ups to?", "run": run_investors_follow_up, }, "pipeline_top": { "summary": "Top pipeline opportunities by stage, with investor, owner and last activity.", "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": MAX_ROWS}}, "example": "List our top 10 pipeline investors by stage and last conversation.", "run": run_pipeline_top, }, "pipeline_totals": { "summary": "Total pipeline dollars and the split across each stage.", "slots": {}, "example": "What is our total pipeline in dollars, split by stage?", "run": run_pipeline_totals, }, "recent_emails": { "summary": "The most recent investor emails (optionally inbound or outbound only).", "slots": {"limit": {"type": "int", "default": 10, "min": 1, "max": 100}, "direction": {"type": "enum", "choices": ["any", "inbound", "outbound"], "default": "any"}}, "example": "What were the last 10 investor emails we sent, and who to?", "run": run_recent_emails, }, "investor_last_contact": { "summary": "When we last had any activity with a given investor.", "slots": {"name": {"type": "text", "required": True, "maxlen": 120}}, "example": "When did we last reach out to Acme Capital?", "run": run_investor_last_contact, }, "comms_by_user": { "summary": "Recent investor emails sent by a given team member.", "slots": {"user": {"type": "text", "required": True, "maxlen": 80}, "limit": {"type": "int", "default": 10, "min": 1, "max": 100}}, "example": "What were the last investor emails sent by Grant?", "run": run_comms_by_user, }, "email_counts_by_user": { "summary": "How many investor emails each user sent this week / month / year-to-date.", "slots": {"user": {"type": "text", "required": False, "maxlen": 80}}, "example": "How many emails has Grant sent this week, this month, and year to date?", "run": run_email_counts_by_user, }, }