Files
ten31-database/backend/nl_query/translate.py
T
Keysat 6c29c22601 Add NL-query backend (W2): local translator + safe named-query runner
Read-only "ask the database in plain English" backend. Translation runs on
the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves
the box, no Claude and no redaction boundary (the simplification chosen after
pressure-testing). The safe surface is a curated catalog of ~12 hand-written
parameterized queries; a slot validator is the trust boundary (no generic SQL,
no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated
require_bot_or_admin, read-only, audited. Soft-delete-correct per table.
Local Qwen translated 12/12 real example questions correctly against the live
Spark. Web "Ask" box and Matrix bot still to come (steps 4-5).
2026-06-18 18:35:41 -05:00

109 lines
5.3 KiB
Python

"""NL-query translator — plain-English question -> {intent, slots} on the LOCAL model.
The model's ONLY job is to pick one curated intent and fill its typed slots; it never
touches the database, never sees a row, and never writes SQL. Its output is untrusted and
is handed straight to the runner's validator (runner.validate), which is the trust boundary.
LOCAL-ONLY BY CONSTRUCTION. Translation runs on the local Qwen via Spark Control
(SPARK_CONTROL_URL), the same sanctioned local leg as intake/digest — so the question never
leaves the box and there is NO Claude path and NO redaction boundary to manage here (that
was the whole point of the W2 simplification: the answer is sensitive and never leaves; the
question is generic English and is translated locally). If the local model ever proves too
weak, a Claude-behind-redaction translator could be slotted in as an alternative `chat_fn`
WITHOUT changing the validator/executor — but it is deliberately not built.
`chat_fn(prompt, system) -> dict|None` is injectable so the whole translation leg is testable
offline without Spark. The default calls the ingest Spark client (lazy import — it ships in
the Docker image, not the bare CRM).
"""
from .intents import INTENTS
from .runner import run_query
def _default_chat_json(prompt, system):
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest"))
import llm # noqa: E402 (ingest Spark client; raises if Spark is unreachable)
return llm.chat_json(prompt, system=system, max_tokens=400)
def _render_slot(name, spec):
t = spec["type"]
if t == "int":
extra = f", default {spec['default']}" if "default" in spec else ""
return f"{name} (integer{extra})"
if t == "enum":
extra = f", default {spec['default']}" if "default" in spec else ""
return f"{name} (one of {'|'.join(spec['choices'])}{extra})"
req = ", required" if spec.get("required") else ", optional"
return f"{name} (text{req})"
def build_system():
"""The system prompt: the full intent catalog as the model's closed vocabulary."""
lines = [
"You translate a question about a venture fund's investor database into ONE "
"structured query. Respond with ONLY a JSON object and nothing else:",
' {"intent": "<one key below, or null>", "slots": {<slot>: <value>}}',
"",
"Rules:",
"- Choose the single best-fitting intent. If none fits, return {\"intent\": null}.",
"- Use ONLY the slot names listed for the chosen intent; omit a slot to accept its default.",
"- Convert natural durations to the integer a slot wants: '3 months'->90, 'a quarter'->90, "
"'6 weeks'->42, 'a year'/'year to date'->365.",
"- Copy names, cities and people verbatim from the question into text slots.",
"- No commentary, no markdown, JSON only.",
"",
"Intents:",
]
for key, spec in INTENTS.items():
slots = spec["slots"]
slot_str = "; ".join(_render_slot(n, s) for n, s in slots.items()) or "(none)"
lines.append(f"- {key}: {spec['summary']}")
lines.append(f" slots: {slot_str}")
if spec.get("example"):
lines.append(f" e.g. \"{spec['example']}\"")
return "\n".join(lines)
def translate(question, *, chat_fn=None):
"""Map a question to {intent, slots} on the local model. Returns that dict, or an error
dict {error, detail}: 'model_unavailable' (local model unreachable -> the endpoint 503s)
or 'no_match' (the model could not map the question to any intent)."""
chat_fn = chat_fn or _default_chat_json
q = (question or "").strip()
if not q:
return {"error": "no_match", "detail": "empty question"}
try:
data = chat_fn(q, build_system())
except Exception as exc: # connection/runtime failure on the LOCAL model
return {"error": "model_unavailable", "detail": str(exc)}
if not isinstance(data, dict):
return {"error": "no_match", "detail": "model returned no parseable JSON"}
intent = data.get("intent")
if intent in (None, "", "null", "none"):
return {"error": "no_match", "detail": "no intent fit the question"}
slots = data.get("slots")
slots = slots if isinstance(slots, dict) else {}
# Drop slot KEYS the chosen intent doesn't declare — model noise, not a safety concern
# (every surviving VALUE still goes through full type validation in the runner). Unknown
# intents are left as-is so the runner rejects them as unknown_intent.
if intent in INTENTS:
allowed = INTENTS[intent]["slots"]
slots = {k: v for k, v in slots.items() if k in allowed}
return {"intent": intent, "slots": slots}
def answer(conn, question, *, chat_fn=None, audit_fn=None, actor=None, source="api"):
"""End-to-end: translate a question locally, then run it through the validated runner.
Returns the runner's result (with the interpreted intent/slots, so a human can see how
the question was read) plus the original question, or a translation error dict."""
t = translate(question, chat_fn=chat_fn)
if t.get("error"):
return {**t, "question": question}
result = run_query(conn, t["intent"], t["slots"],
audit_fn=audit_fn, actor=actor, source=source)
result["question"] = question
return result