Files
ten31-database/backend/nl_query/test_nl_query_endpoint.py
T
Keysat 6c29c22601 Add NL-query backend (W2): local translator + safe named-query runner
Read-only "ask the database in plain English" backend. Translation runs on
the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves
the box, no Claude and no redaction boundary (the simplification chosen after
pressure-testing). The safe surface is a curated catalog of ~12 hand-written
parameterized queries; a slot validator is the trust boundary (no generic SQL,
no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated
require_bot_or_admin, read-only, audited. Soft-delete-correct per table.
Local Qwen translated 12/12 real example questions correctly against the live
Spark. Web "Ask" box and Matrix bot still to come (steps 4-5).
2026-06-18 18:35:41 -05:00

140 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""Endpoint tests for the W2 NL-query HTTP surface (POST /api/query/nl, GET /api/query/catalog).
Boots the REAL server against a temp DB and exercises the wiring end-to-end: auth gating
(bot/admin only), the direct {intent, slots} mode, the soft-error shape, and the status
mapping. The local model is forced UNAVAILABLE by pointing SPARK_CONTROL_URL at a dead local
port, so the {question} path exercises the 503 path deterministically without any Spark.
Synthetic data only.
Run: cd backend && python3 nl_query/test_nl_query_endpoint.py
"""
import http.client
import json
import os
import sqlite3
import sys
import tempfile
import threading
from http.server import ThreadingHTTPServer
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1"
# Dead port -> the local-model leg fails fast, so the {question} path returns 503 deterministically
# (set before server/config import; load_env uses setdefault so this wins over any repo .env).
os.environ["SPARK_CONTROL_URL"] = "http://127.0.0.1:1"
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
import server # noqa: E402
import nl_query # noqa: E402
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
class _Quiet(server.CRMHandler):
def log_message(self, *a):
pass
def _req(port, method, path, token=None, body=None):
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10)
headers = {}
if token:
headers["Authorization"] = "Bearer " + token
payload = json.dumps(body) if body is not None else None
if payload is not None:
headers["Content-Type"] = "application/json"
conn.request(method, path, body=payload, headers=headers)
resp = conn.getresponse()
raw = resp.read().decode("utf-8", "replace")
conn.close()
data = json.loads(raw) if raw else None
return resp.status, data
def _data(d):
return (d or {}).get("data") or {}
def main():
server.init_db()
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
db.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) VALUES "
"('u_admin','grant','g@t.x','x','Grant','admin',1),"
"('u_mem','mem','m@t.x','x','Mem','member',1)")
db.execute("INSERT INTO fundraising_investors (id,investor_name,lead,graveyard,source_row_id,"
"total_invested) VALUES ('a','Acme Capital','Jon',0,'a',5000000),"
"('b','Beta Partners','Grant',0,'b',2000000),('g','Ghost','Grant',1,'g',9000000)")
db.commit()
db.close()
admin = server.create_token("u_admin", "grant", "admin")
member = server.create_token("u_mem", "mem", "member")
httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet)
port = httpd.server_address[1]
threading.Thread(target=httpd.serve_forever, daemon=True).start()
try:
print("direct {intent, slots} mode")
st, d = _req(port, "POST", "/api/query/nl", admin,
{"intent": "top_investors_committed", "slots": {"limit": 2}})
rows = _data(d).get("rows", [])
check(st == 200 and [r["investor_name"] for r in rows] == ["Acme Capital", "Beta Partners"],
f"admin direct query -> 200 + rows (got {st})")
check(_data(d).get("intent") == "top_investors_committed", "response echoes interpreted intent")
print("soft errors + validation")
st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "made_up"})
check(st == 200 and _data(d).get("error") == "unknown_intent",
f"bad intent -> 200 with data.error=unknown_intent (got {st}, {_data(d).get('error')})")
st, d = _req(port, "POST", "/api/query/nl", admin, {})
check(st == 400, f"neither question nor intent -> 400 (got {st})")
print("auth gating")
st, _ = _req(port, "POST", "/api/query/nl", member,
{"intent": "top_investors_committed"})
check(st == 403, f"member -> 403 (got {st})")
st, _ = _req(port, "POST", "/api/query/nl", None, {"intent": "top_investors_committed"})
check(st == 401, f"unauthenticated -> 401 (got {st})")
print("catalog")
st, d = _req(port, "GET", "/api/query/catalog", admin)
check(st == 200 and isinstance(d.get("data"), list) and len(d["data"]) == len(nl_query.INTENTS),
f"catalog -> 200 with every intent (got {st})")
st, _ = _req(port, "GET", "/api/query/catalog", member)
check(st == 403, f"catalog member -> 403 (got {st})")
print("question path with the local model down")
st, d = _req(port, "POST", "/api/query/nl", admin,
{"question": "who are our top investors by committed capital?"})
check(st == 503 and _data(d).get("error") == "model_unavailable",
f"question + dead model -> 503 model_unavailable (got {st}, {_data(d).get('error')})")
check(_data(d).get("question"), "question echoed back even on outage")
print("audit trail")
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
n = db.execute("SELECT COUNT(*) FROM audit_log WHERE entity_type='nl_query'").fetchone()[0]
db.close()
check(n >= 2, f"executed queries are audited (entity_type=nl_query rows: {n})")
finally:
httpd.shutdown()
print()
if FAILS:
print(f"{len(FAILS)} FAILED")
for f in FAILS:
print(" - " + f)
sys.exit(1)
print("ALL PASS")
if __name__ == "__main__":
main()