#!/usr/bin/env python3 """Endpoint tests for POST /api/admin/digest/test-email (handle_admin_send_test_email). Boots the REAL server in-process against a throwaway DB, monkeypatches the SMTP sender (no network), and proves the security-relevant contract: * admin-gated: 401 without a token, 403 for a non-admin. * recipients are restricted to the active-admin set — an arbitrary `to` is rejected (the endpoint is NOT an open relay), an admin `to` is allowed, and the default audience is every active admin. * a send failure does not echo the exception (which could carry the SMTP credential) back to the caller. Run: cd backend && python3 test_smtp_endpoint.py """ import http.client import json import os import sys import tempfile import threading from http.server import ThreadingHTTPServer _BASE = tempfile.mkdtemp() _FRONTEND = os.path.join(_BASE, "frontend") os.makedirs(os.path.join(_FRONTEND, "assets")) _DATA = os.path.join(_BASE, "data") os.makedirs(_DATA) with open(os.path.join(_FRONTEND, "index.html"), "w") as f: f.write("crm") os.environ["CRM_FRONTEND_DIR"] = _FRONTEND os.environ["CRM_DATA_DIR"] = _DATA os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import server # noqa: E402 import smtp_send # noqa: E402 FAILS = [] # ── fake sender: record calls, optionally raise (no network) ── SENT = [] RAISE = {"exc": None} def fake_send(to_addrs, subject, body, **kw): if RAISE["exc"] is not None: raise RAISE["exc"] SENT.append({"to": list(to_addrs), "subject": subject}) return {"sent_to": list(to_addrs), "from": "digest@ten31.test"} smtp_send.send_email = fake_send smtp_send.smtp_configured = lambda: True def check(cond, msg): print((" PASS " if cond else " FAIL ") + msg) if not cond: FAILS.append(msg) class _Quiet(server.CRMHandler): def log_message(self, *a): pass def _post(port, path, token=None, body=None): conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10) headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = "Bearer " + token conn.request("POST", path, body=json.dumps(body or {}), headers=headers) resp = conn.getresponse() raw = resp.read().decode("utf-8", "replace") conn.close() try: parsed = json.loads(raw) except Exception: parsed = None return resp.status, raw, parsed def main(): server.init_db() # Admin = first registered user. A member is inserted directly (self-register # is disabled after the first user) so get_user resolves a real active row. httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet) port = httpd.server_address[1] threading.Thread(target=httpd.serve_forever, daemon=True).start() try: st, raw, j = _post(port, "/api/auth/register", body={ "username": "admin", "email": "admin@ten31.test", "password": "password123", "full_name": "Admin User"}) check(st == 201 and j and j.get("token"), f"register first user as admin (got {st})") admin_token = j["token"] admin_email = j["user"]["email"] conn = server.get_db() conn.execute( "INSERT INTO users (id, username, email, password_hash, full_name, role, is_active) " "VALUES (?,?,?,?,?,?,1)", ("member-1", "member1", "member1@ten31.test", server.hash_password("password123"), "Member One", "member")) conn.commit() conn.close() member_token = server.create_token("member-1", "member1", "member") path = "/api/admin/digest/test-email" # 1. unauthenticated -> 401, sender untouched SENT.clear() st, raw, j = _post(port, path) check(st == 401, f"no token -> 401 (got {st})") check(not SENT, "no token: sender not called") # 2. non-admin -> 403 SENT.clear() st, raw, j = _post(port, path, token=member_token) check(st == 403, f"member -> 403 (got {st})") check(not SENT, "member: sender not called") # 3. admin, no `to` -> 200, default audience = the admin set SENT.clear() st, raw, j = _post(port, path, token=admin_token) check(st == 200, f"admin default -> 200 (got {st})") check(len(SENT) == 1 and SENT[0]["to"] == [admin_email], f"default recipients = active admins ({admin_email}); got {SENT}") # 4. admin, arbitrary outside `to` -> 400, NOT an open relay SENT.clear() st, raw, j = _post(port, path, token=admin_token, body={"to": "attacker@evil.com"}) check(st == 400, f"outside `to` -> 400 (got {st})") check(not SENT, "outside `to`: sender not called (no relay)") # 5. admin, `to` an admin address (case-insensitive) -> 200 SENT.clear() st, raw, j = _post(port, path, token=admin_token, body={"to": admin_email.upper()}) check(st == 200, f"admin `to` -> 200 (got {st})") check(len(SENT) == 1 and SENT[0]["to"] == [admin_email.upper()], "admin `to` is delivered as given") # 6. mixed list with one outside address -> 400, all rejected SENT.clear() st, raw, j = _post(port, path, token=admin_token, body={"to": [admin_email, "outsider@evil.com"]}) check(st == 400, f"mixed list with outsider -> 400 (got {st})") check(not SENT, "mixed list: sender not called") # 7. send failure must NOT leak the exception text (could carry the credential) SENT.clear() secret = "PWLEAK_SsHh99" RAISE["exc"] = Exception(f"535 auth failed for user with password {secret}") st, raw, j = _post(port, path, token=admin_token) RAISE["exc"] = None check(st == 502, f"send failure -> 502 (got {st})") check(secret not in raw, "send-failure response does not leak the exception/credential text") finally: httpd.shutdown() print() if FAILS: print(f"FAILED ({len(FAILS)}):") for f in FAILS: print(f" - {f}") sys.exit(1) print("ALL PASS (digest test-email endpoint)") if __name__ == "__main__": main()