6c29c22601
Read-only "ask the database in plain English" backend. Translation runs on
the local Qwen via Spark Control (question -> {intent, slots}); nothing leaves
the box, no Claude and no redaction boundary (the simplification chosen after
pressure-testing). The safe surface is a curated catalog of ~12 hand-written
parameterized queries; a slot validator is the trust boundary (no generic SQL,
no dynamic identifiers). POST /api/query/nl + GET /api/query/catalog, gated
require_bot_or_admin, read-only, audited. Soft-delete-correct per table.
Local Qwen translated 12/12 real example questions correctly against the live
Spark. Web "Ask" box and Matrix bot still to come (steps 4-5).
140 lines
5.6 KiB
Python
140 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Endpoint tests for the W2 NL-query HTTP surface (POST /api/query/nl, GET /api/query/catalog).
|
|
|
|
Boots the REAL server against a temp DB and exercises the wiring end-to-end: auth gating
|
|
(bot/admin only), the direct {intent, slots} mode, the soft-error shape, and the status
|
|
mapping. The local model is forced UNAVAILABLE by pointing SPARK_CONTROL_URL at a dead local
|
|
port, so the {question} path exercises the 503 path deterministically without any Spark.
|
|
Synthetic data only.
|
|
|
|
Run: cd backend && python3 nl_query/test_nl_query_endpoint.py
|
|
"""
|
|
import http.client
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
from http.server import ThreadingHTTPServer
|
|
|
|
_DATA = tempfile.mkdtemp()
|
|
os.environ["CRM_DATA_DIR"] = _DATA
|
|
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
|
|
os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1"
|
|
# Dead port -> the local-model leg fails fast, so the {question} path returns 503 deterministically
|
|
# (set before server/config import; load_env uses setdefault so this wins over any repo .env).
|
|
os.environ["SPARK_CONTROL_URL"] = "http://127.0.0.1:1"
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/
|
|
import server # noqa: E402
|
|
import nl_query # noqa: E402
|
|
|
|
FAILS = []
|
|
|
|
|
|
def check(cond, msg):
|
|
print((" PASS " if cond else " FAIL ") + msg)
|
|
if not cond:
|
|
FAILS.append(msg)
|
|
|
|
|
|
class _Quiet(server.CRMHandler):
|
|
def log_message(self, *a):
|
|
pass
|
|
|
|
|
|
def _req(port, method, path, token=None, body=None):
|
|
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10)
|
|
headers = {}
|
|
if token:
|
|
headers["Authorization"] = "Bearer " + token
|
|
payload = json.dumps(body) if body is not None else None
|
|
if payload is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
conn.request(method, path, body=payload, headers=headers)
|
|
resp = conn.getresponse()
|
|
raw = resp.read().decode("utf-8", "replace")
|
|
conn.close()
|
|
data = json.loads(raw) if raw else None
|
|
return resp.status, data
|
|
|
|
|
|
def _data(d):
|
|
return (d or {}).get("data") or {}
|
|
|
|
|
|
def main():
|
|
server.init_db()
|
|
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
|
|
db.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) VALUES "
|
|
"('u_admin','grant','g@t.x','x','Grant','admin',1),"
|
|
"('u_mem','mem','m@t.x','x','Mem','member',1)")
|
|
db.execute("INSERT INTO fundraising_investors (id,investor_name,lead,graveyard,source_row_id,"
|
|
"total_invested) VALUES ('a','Acme Capital','Jon',0,'a',5000000),"
|
|
"('b','Beta Partners','Grant',0,'b',2000000),('g','Ghost','Grant',1,'g',9000000)")
|
|
db.commit()
|
|
db.close()
|
|
admin = server.create_token("u_admin", "grant", "admin")
|
|
member = server.create_token("u_mem", "mem", "member")
|
|
|
|
httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet)
|
|
port = httpd.server_address[1]
|
|
threading.Thread(target=httpd.serve_forever, daemon=True).start()
|
|
try:
|
|
print("direct {intent, slots} mode")
|
|
st, d = _req(port, "POST", "/api/query/nl", admin,
|
|
{"intent": "top_investors_committed", "slots": {"limit": 2}})
|
|
rows = _data(d).get("rows", [])
|
|
check(st == 200 and [r["investor_name"] for r in rows] == ["Acme Capital", "Beta Partners"],
|
|
f"admin direct query -> 200 + rows (got {st})")
|
|
check(_data(d).get("intent") == "top_investors_committed", "response echoes interpreted intent")
|
|
|
|
print("soft errors + validation")
|
|
st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "made_up"})
|
|
check(st == 200 and _data(d).get("error") == "unknown_intent",
|
|
f"bad intent -> 200 with data.error=unknown_intent (got {st}, {_data(d).get('error')})")
|
|
st, d = _req(port, "POST", "/api/query/nl", admin, {})
|
|
check(st == 400, f"neither question nor intent -> 400 (got {st})")
|
|
|
|
print("auth gating")
|
|
st, _ = _req(port, "POST", "/api/query/nl", member,
|
|
{"intent": "top_investors_committed"})
|
|
check(st == 403, f"member -> 403 (got {st})")
|
|
st, _ = _req(port, "POST", "/api/query/nl", None, {"intent": "top_investors_committed"})
|
|
check(st == 401, f"unauthenticated -> 401 (got {st})")
|
|
|
|
print("catalog")
|
|
st, d = _req(port, "GET", "/api/query/catalog", admin)
|
|
check(st == 200 and isinstance(d.get("data"), list) and len(d["data"]) == len(nl_query.INTENTS),
|
|
f"catalog -> 200 with every intent (got {st})")
|
|
st, _ = _req(port, "GET", "/api/query/catalog", member)
|
|
check(st == 403, f"catalog member -> 403 (got {st})")
|
|
|
|
print("question path with the local model down")
|
|
st, d = _req(port, "POST", "/api/query/nl", admin,
|
|
{"question": "who are our top investors by committed capital?"})
|
|
check(st == 503 and _data(d).get("error") == "model_unavailable",
|
|
f"question + dead model -> 503 model_unavailable (got {st}, {_data(d).get('error')})")
|
|
check(_data(d).get("question"), "question echoed back even on outage")
|
|
|
|
print("audit trail")
|
|
db = sqlite3.connect(os.environ["CRM_DB_PATH"])
|
|
n = db.execute("SELECT COUNT(*) FROM audit_log WHERE entity_type='nl_query'").fetchone()[0]
|
|
db.close()
|
|
check(n >= 2, f"executed queries are audited (entity_type=nl_query rows: {n})")
|
|
finally:
|
|
httpd.shutdown()
|
|
|
|
print()
|
|
if FAILS:
|
|
print(f"{len(FAILS)} FAILED")
|
|
for f in FAILS:
|
|
print(" - " + f)
|
|
sys.exit(1)
|
|
print("ALL PASS")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|