#!/usr/bin/env python3 """Endpoint tests for the W2 NL-query HTTP surface (POST /api/query/nl, GET /api/query/catalog). Boots the REAL server against a temp DB and exercises the wiring end-to-end: auth gating (bot/admin only), the direct {intent, slots} mode, the soft-error shape, and the status mapping. The local model is forced UNAVAILABLE by pointing SPARK_CONTROL_URL at a dead local port, so the {question} path exercises the 503 path deterministically without any Spark. Synthetic data only. Run: cd backend && python3 nl_query/test_nl_query_endpoint.py """ import http.client import json import os import sqlite3 import sys import tempfile import threading from http.server import ThreadingHTTPServer _DATA = tempfile.mkdtemp() os.environ["CRM_DATA_DIR"] = _DATA os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") os.environ["CRM_GMAIL_INTEGRATION_ENABLED"] = "1" # Dead port -> the local-model leg fails fast, so the {question} path returns 503 deterministically # (set before server/config import; load_env uses setdefault so this wins over any repo .env). os.environ["SPARK_CONTROL_URL"] = "http://127.0.0.1:1" sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # backend/ import server # noqa: E402 import nl_query # noqa: E402 FAILS = [] def check(cond, msg): print((" PASS " if cond else " FAIL ") + msg) if not cond: FAILS.append(msg) class _Quiet(server.CRMHandler): def log_message(self, *a): pass def _req(port, method, path, token=None, body=None): conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10) headers = {} if token: headers["Authorization"] = "Bearer " + token payload = json.dumps(body) if body is not None else None if payload is not None: headers["Content-Type"] = "application/json" conn.request(method, path, body=payload, headers=headers) resp = conn.getresponse() raw = resp.read().decode("utf-8", "replace") conn.close() data = json.loads(raw) if raw else None return resp.status, data def _data(d): return (d or {}).get("data") or {} def main(): server.init_db() db = sqlite3.connect(os.environ["CRM_DB_PATH"]) db.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) VALUES " "('u_admin','grant','g@t.x','x','Grant','admin',1)," "('u_mem','mem','m@t.x','x','Mem','member',1)") db.execute("INSERT INTO fundraising_investors (id,investor_name,lead,graveyard,source_row_id," "total_invested) VALUES ('a','Acme Capital','Jon',0,'a',5000000)," "('b','Beta Partners','Grant',0,'b',2000000),('g','Ghost','Grant',1,'g',9000000)") db.commit() db.close() admin = server.create_token("u_admin", "grant", "admin") member = server.create_token("u_mem", "mem", "member") httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet) port = httpd.server_address[1] threading.Thread(target=httpd.serve_forever, daemon=True).start() try: print("direct {intent, slots} mode") st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "top_investors_committed", "slots": {"limit": 2}}) rows = _data(d).get("rows", []) check(st == 200 and [r["investor_name"] for r in rows] == ["Acme Capital", "Beta Partners"], f"admin direct query -> 200 + rows (got {st})") check(_data(d).get("intent") == "top_investors_committed", "response echoes interpreted intent") print("soft errors + validation") st, d = _req(port, "POST", "/api/query/nl", admin, {"intent": "made_up"}) check(st == 200 and _data(d).get("error") == "unknown_intent", f"bad intent -> 200 with data.error=unknown_intent (got {st}, {_data(d).get('error')})") st, d = _req(port, "POST", "/api/query/nl", admin, {}) check(st == 400, f"neither question nor intent -> 400 (got {st})") print("auth gating") st, _ = _req(port, "POST", "/api/query/nl", member, {"intent": "top_investors_committed"}) check(st == 403, f"member -> 403 (got {st})") st, _ = _req(port, "POST", "/api/query/nl", None, {"intent": "top_investors_committed"}) check(st == 401, f"unauthenticated -> 401 (got {st})") print("catalog") st, d = _req(port, "GET", "/api/query/catalog", admin) check(st == 200 and isinstance(d.get("data"), list) and len(d["data"]) == len(nl_query.INTENTS), f"catalog -> 200 with every intent (got {st})") st, _ = _req(port, "GET", "/api/query/catalog", member) check(st == 403, f"catalog member -> 403 (got {st})") print("question path with the local model down") st, d = _req(port, "POST", "/api/query/nl", admin, {"question": "who are our top investors by committed capital?"}) check(st == 503 and _data(d).get("error") == "model_unavailable", f"question + dead model -> 503 model_unavailable (got {st}, {_data(d).get('error')})") check(_data(d).get("question"), "question echoed back even on outage") print("audit trail") db = sqlite3.connect(os.environ["CRM_DB_PATH"]) n = db.execute("SELECT COUNT(*) FROM audit_log WHERE entity_type='nl_query'").fetchone()[0] db.close() check(n >= 2, f"executed queries are audited (entity_type=nl_query rows: {n})") finally: httpd.shutdown() print() if FAILS: print(f"{len(FAILS)} FAILED") for f in FAILS: print(" - " + f) sys.exit(1) print("ALL PASS") if __name__ == "__main__": main()