#!/usr/bin/env python3 """Regression test for the Grid-detail communications timeline filter (Phase 8c, G6). The mobile Grid detail's notes timeline pulls an investor-level communication stream via GET /api/communications?source_row_id=. That filter (added to handle_list_communications) maps the grid JSON row id → fundraising_investors.source_row_id → fundraising_contacts.contact_id → communications, so it must: - return every communication across ALL the investor's contacts, - stay isolated (one investor's row id never returns another's comms), - respect soft-delete (cm.deleted_at IS NULL) through the join. Boots the REAL server, seeds investors by driving the one-row log path (which creates the grid row + contact + communication AND syncs the relational mirror the filter joins on), then drives the live read path with a real token. Synthetic only (guardrail #9). Run: cd backend && python3 test_grid_comm_timeline.py """ import http.client import json import os import sqlite3 import sys import tempfile import threading from http.server import ThreadingHTTPServer _DATA = tempfile.mkdtemp() os.environ["CRM_DATA_DIR"] = _DATA os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import server # noqa: E402 FAILS = [] DEL = "2026-06-01T00:00:00" def check(cond, msg): print((" PASS " if cond else " FAIL ") + msg) if not cond: FAILS.append(msg) class _Quiet(server.CRMHandler): def log_message(self, *a): pass def _req(method, port, path, token, body=None): conn = http.client.HTTPConnection("127.0.0.1", port, timeout=10) headers = {"Authorization": "Bearer " + token} payload = None if body is not None: payload = json.dumps(body) headers["Content-Type"] = "application/json" conn.request(method, path, body=payload, headers=headers) resp = conn.getresponse() raw = resp.read().decode("utf-8", "replace") conn.close() data = None if raw: try: data = json.loads(raw) except ValueError: pass return resp.status, data def _log_comm(port, token, investor_name, contact, subject, create=False): """Drive the one-row log path; returns (status, grid_row_id).""" st, data = _req("POST", port, "/api/fundraising/log-communication", token, { "investor_name": investor_name, "create_investor_if_missing": create, "contact": contact, "type": "note", "subject": subject, "body": subject, "append_note": True, }) row_id = ((data or {}).get("data", {}).get("row") or {}).get("id") return st, row_id def main(): server.init_db() conn = sqlite3.connect(os.environ["CRM_DB_PATH"]) # password_hash is intentionally a non-bcrypt placeholder — we mint the token directly via # create_token(), so the password-verify path is never exercised. conn.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) " "VALUES ('u1','grant','grant@ten31.example','x','Grant','admin',1)") conn.commit() conn.close() token = server.create_token("u1", "grant", "admin") httpd = ThreadingHTTPServer(("127.0.0.1", 0), _Quiet) port = httpd.server_address[1] threading.Thread(target=httpd.serve_forever, daemon=True).start() try: # Investor A: two contacts, one communication per contact. Create seeds the row with # Jane + logs "Intro call"; update-row adds John as a second pill (so the relational mirror # links BOTH contacts to A's row); then a comm is logged against John. The timeline must # aggregate across both contacts — the point of the source_row_id join over a single contact. st, rowA = _log_comm(port, token, "Acme Capital", {"name": "Jane Doe", "email": "jane@acme.example"}, "Intro call", create=True) check(st == 201 and bool(rowA), f"create investor A via log path -> 201 + row id (got {st}, {rowA})") st, _ = _req("POST", port, "/api/fundraising/update-row", token, { "row_id": rowA, "investor_name": "Acme Capital", "contacts": [{"name": "Jane Doe", "email": "jane@acme.example"}, {"name": "John Roe", "email": "john@acme.example"}], }) check(st == 200, f"add John as a second contact on A via update-row (got {st})") st, _ = _log_comm(port, token, "Acme Capital", {"name": "John Roe", "email": "john@acme.example"}, "Follow-up email") check(st == 201, f"second contact's comm logged on A (got {st})") # Investor B: a separate investor, one communication (isolation control). st, rowB = _log_comm(port, token, "Beacon Ventures", {"name": "Sam Poe", "email": "sam@beacon.example"}, "Beacon note", create=True) check(st == 201 and bool(rowB), f"create investor B via log path -> 201 + row id (got {st}, {rowB})") # ── source_row_id returns the whole investor (across contacts) ── print("\n[source_row_id timeline]") st, data = _req("GET", port, f"/api/communications?source_row_id={rowA}", token) subsA = {c.get("subject") for c in (data or {}).get("data", [])} check(st == 200, f"GET timeline for A -> 200 (got {st})") check(subsA == {"Intro call", "Follow-up email"}, f"A's timeline spans both contacts' comms (got {subsA})") # ── isolation: A's row id never returns B's comms ── print("\n[isolation]") check("Beacon note" not in subsA, "A's timeline excludes investor B's comm") st, dataB = _req("GET", port, f"/api/communications?source_row_id={rowB}", token) subsB = {c.get("subject") for c in (dataB or {}).get("data", [])} check(subsB == {"Beacon note"}, f"B's timeline is its own comm only (got {subsB})") # ── soft-delete respected through the join ── print("\n[soft-delete]") c = sqlite3.connect(os.environ["CRM_DB_PATH"]) c.execute("UPDATE communications SET deleted_at=? WHERE subject='Intro call'", (DEL,)) c.commit() c.close() st, data2 = _req("GET", port, f"/api/communications?source_row_id={rowA}", token) subsA2 = {c.get("subject") for c in (data2 or {}).get("data", [])} check(subsA2 == {"Follow-up email"}, f"soft-deleted comm filtered from A's timeline (got {subsA2})") # ── unknown row id returns empty, not an error ── st, data3 = _req("GET", port, "/api/communications?source_row_id=does-not-exist", token) check(st == 200 and (data3 or {}).get("data") == [], f"unknown source_row_id -> 200 + empty (got {st}, {(data3 or {}).get('data')})") finally: httpd.shutdown() print() if FAILS: print(f"FAILED ({len(FAILS)}):") for f in FAILS: print(f" - {f}") sys.exit(1) print("ALL PASS (grid comm timeline source_row_id filter)") if __name__ == "__main__": main()