#!/usr/bin/env python3 """Tests for the daily activity digest (Phase B): the per-user + per-investor activity queries (soft-delete filtered), inbound dedup, the two-section body, the local-summary fallback, the DB-backed policy resolver, and the scheduler's once-per-day / send-hour / policy / force guards. The local Spark model and the mail transport are stubbed — no network. Synthetic data only (guardrail #9). Run: cd backend && python3 test_digest_builder.py """ import json import os import sqlite3 import sys import tempfile from datetime import datetime, timezone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) os.environ["CRM_DB_PATH"] = os.path.join(tempfile.mkdtemp(), "crm.db") os.environ.setdefault("CRM_DATA_DIR", os.path.dirname(os.environ["CRM_DB_PATH"])) os.environ["CRM_DIGEST_ENABLED"] = "1" # so the non-force scheduler path is live import digest_builder # noqa: E402 from email_integration import digest_scheduler # noqa: E402 FAILS = [] SINCE = "2026-06-17T00:00:00Z" UNTIL = "2026-06-18T00:00:00Z" def check(cond, msg): print((" PASS " if cond else " FAIL ") + msg) if not cond: FAILS.append(msg) def _conn(): conn = sqlite3.connect(os.environ["CRM_DB_PATH"]) conn.row_factory = sqlite3.Row return conn def setup(): conn = _conn() conn.executescript(""" CREATE TABLE users (id TEXT PRIMARY KEY, username TEXT, full_name TEXT, email TEXT, role TEXT, is_active INT DEFAULT 1); CREATE TABLE email_accounts (id TEXT PRIMARY KEY, user_id TEXT, email_address TEXT); CREATE TABLE emails (id TEXT PRIMARY KEY, subject TEXT, body_text TEXT, snippet TEXT, from_email TEXT, sent_at TEXT, is_matched INT DEFAULT 1); CREATE TABLE email_account_messages (id TEXT PRIMARY KEY, email_id TEXT, account_id TEXT, gmail_message_id TEXT, gmail_thread_id TEXT, is_sent INT DEFAULT 0, deleted_at TEXT); CREATE TABLE email_investor_links (id TEXT PRIMARY KEY, email_id TEXT, fundraising_investor_id TEXT, organization_id TEXT, contact_id TEXT, matched_address TEXT); CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT); CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, deleted_at TEXT); CREATE TABLE contacts (id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT, deleted_at TEXT); CREATE TABLE app_settings (key TEXT PRIMARY KEY, value_json TEXT, updated_at TEXT); """) conn.executemany("INSERT INTO users (id,username,full_name,email,role,is_active) VALUES (?,?,?,?,?,?)", [ ("u1", "grant", "Grant Gilliam", "grant@ten31.xyz", "admin", 1), ("u2", "jk", "Jonathan K", "jk@ten31.xyz", "member", 1), ("u3", "retired", "Old Admin", "old@ten31.xyz", "admin", 0), # inactive -> excluded ]) conn.executemany("INSERT INTO email_accounts (id,user_id,email_address) VALUES (?,?,?)", [ ("a1", "u1", "grant@ten31.xyz"), ("a2", "u2", "jk@ten31.xyz"), ("a3", "u3", "old@ten31.xyz"), ]) conn.executemany("INSERT INTO fundraising_investors (id,investor_name) VALUES (?,?)", [ ("inv1", "Harbor & Vine"), ("inv2", "Brightwater Capital"), ("inv3", "Vela Partners"), ]) conn.executemany("INSERT INTO organizations (id,name,deleted_at) VALUES (?,?,?)", [ ("org1", "Summit Fund", None), ("org2", "Deleted Org", "2026-06-01T00:00:00Z"), ]) conn.executemany("INSERT INTO contacts (id,first_name,last_name,deleted_at) VALUES (?,?,?,?)", [ ("c1", "Jane", "Roe", None), ]) # emails: id, subject, body, from_email, sent_at, is_matched. Outbound = from one # of our own mailboxes (grant@/jk@/old@ ten31.xyz); inbound = from outside. conn.executemany("INSERT INTO emails (id,subject,body_text,from_email,sent_at,is_matched) VALUES (?,?,?,?,?,?)", [ ("e1", "Fund III terms", "Discussing Fund III terms", "grant@ten31.xyz", "2026-06-17T14:00:00Z", 1), ("e2", "Re: allocation", "Question about allocation", "lp@brightwater.example", "2026-06-17T09:00:00Z", 1), ("e3", "Intro", "Summit intro thread", "jk@ten31.xyz", "2026-06-17T11:00:00Z", 1), ("e4", "Coffee", "Catch up note", "jane@roe.example", "2026-06-17T16:00:00Z", 1), ("e5", "Wire", "Wire instructions", "ir@summitcap.example", "2026-06-17T17:00:00Z", 1), ("e6", "Old", "ancient", "grant@ten31.xyz", "2026-06-10T10:00:00Z", 1), # out of window ("e7", "Tombstoned", "deleted sighting", "lp@harborvine.example", "2026-06-17T08:00:00Z", 1), ("e8", "Inactive", "from retired user", "old@ten31.xyz", "2026-06-17T12:00:00Z", 1), ("e9", "Unmatched", "not matched", "lp@harborvine.example", "2026-06-17T13:00:00Z", 0), # is_matched=0 ("e10", "Group update", "inbound to two of us", "lp@vela.example", "2026-06-17T15:00:00Z", 1), ]) # sightings: id, email_id, account_id, is_sent, deleted_at conn.executemany( "INSERT INTO email_account_messages (id,email_id,account_id,gmail_message_id,gmail_thread_id,is_sent,deleted_at) " "VALUES (?,?,?,?,?,?,?)", [ ("s1", "e1", "a1", "g1", "t1", 1, None), # grant SENT ("s2", "e2", "a1", "g2", "t2", 0, None), # grant RECEIVED ("s3", "e3", "a2", "g3", "t3", 1, None), # jk SENT ("s4", "e4", "a1", "g4", "t4", 0, None), # grant RECEIVED (contact) ("s5", "e5", "a1", "g5", "t5", 0, None), # grant RECEIVED (deleted org) ("s6", "e6", "a1", "g6", "t6", 0, None), # out of window ("s7", "e7", "a1", "g7", "t7", 0, "2026-06-17T09:00:00Z"), # tombstoned ("s8", "e8", "a3", "g8", "t8", 1, None), # inactive user ("s9", "e9", "a1", "g9", "t9", 0, None), # unmatched email ("s10a", "e10", "a1", "g10a", "t10", 0, None), # e10 received by grant ... ("s10b", "e10", "a2", "g10b", "t10", 0, None), # ... and by jk (dedup target) ]) # investor links: id, email_id, fr_investor, org, contact, matched_address conn.executemany( "INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,organization_id,contact_id,matched_address) " "VALUES (?,?,?,?,?,?)", [ ("l1", "e1", "inv1", None, None, "lp@harborvine.example"), ("l2", "e2", "inv2", None, None, "lp@brightwater.example"), ("l3", "e3", None, "org1", None, "ir@summitfund.example"), # org name ("l4", "e4", None, None, "c1", "jane@roe.example"), # contact name ("l5", "e5", None, "org2", None, "ir@summitcap.example"), # deleted org -> address ("l6", "e6", "inv1", None, None, "lp@harborvine.example"), ("l7", "e7", "inv1", None, None, "lp@harborvine.example"), ("l8", "e8", "inv1", None, None, "lp@harborvine.example"), ("l9", "e9", "inv1", None, None, "lp@harborvine.example"), ("l10", "e10", "inv3", None, None, "lp@vela.example"), ]) conn.commit() conn.close() def test_collect(): conn = _conn() groups = digest_builder.collect_user_activity(conn, SINCE, UNTIL) conn.close() check(len(groups) == 2, f"two active users with activity (grant, jk), got {len(groups)}") by_user = {g["user_id"]: g for g in groups} check("u3" not in by_user, "inactive user (u3) excluded") grant = by_user.get("u1") if not grant: FAILS.append("grant group missing"); return ids = set(e["email_id"] for e in grant["emails"]) check(ids == {"e1", "e2", "e4", "e5", "e10"}, f"grant has e1,e2,e4,e5,e10 (e6 out-of-window, e7 tombstoned, e9 unmatched excluded), got {sorted(ids)}") check(grant["sent"] == 1 and grant["received"] == 4, f"grant 1 sent / 4 received, got {grant['sent']}/{grant['received']}") e1 = next(e for e in grant["emails"] if e["email_id"] == "e1") check(e1["direction"] == "sent", "e1 direction sent") check(e1["investors"] == ["Harbor & Vine"], f"e1 investor = grid name, got {e1['investors']}") e4 = next(e for e in grant["emails"] if e["email_id"] == "e4") check(e4["investors"] == ["Jane Roe"], f"e4 investor = contact fallback name, got {e4['investors']}") e5 = next(e for e in grant["emails"] if e["email_id"] == "e5") check(e5["investors"] == ["ir@summitcap.example"], f"e5 investor = address (deleted org skipped), got {e5['investors']}") jk = by_user.get("u2") check(jk and jk["emails"][0]["investors"] == ["Summit Fund"], "jk e3 investor = org name") def test_investor(): conn = _conn() inv = digest_builder.collect_investor_activity(conn, SINCE, UNTIL) conn.close() by_name = {g["name"]: g for g in inv} # Harbor & Vine, Brightwater, Vela Partners, Summit Fund, Jane Roe, ir@summitcap.example check(len(inv) == 6, f"six investors with activity, got {len(inv)}: {sorted(by_name)}") hv = by_name.get("Harbor & Vine") check(hv and hv["outbound"] == 1 and hv["inbound"] == 0, f"Harbor & Vine = 1 out / 0 in, got {hv}") check(hv and hv["emails"][0]["members"] == ["Grant Gilliam"], f"outbound attributed to sender, got {hv and hv['emails'][0]['members']}") bw = by_name.get("Brightwater Capital") check(bw and bw["inbound"] == 1 and bw["outbound"] == 0, f"Brightwater = 1 in / 0 out, got {bw}") # e10 was received by TWO mailboxes (grant + jk) -> dedup to one inbound email vela = by_name.get("Vela Partners") check(vela and vela["total"] == 1 and vela["inbound"] == 1, f"Vela inbound deduped across 2 sightings -> 1, got {vela}") def test_build_and_empty(): conn = _conn() stub = lambda prompt, system=None, max_tokens=220: "Grant worked with Harbor & Vine on Fund III." d = digest_builder.build_digest(conn, SINCE, UNTIL, chat_fn=stub) check(d["has_activity"] is True, "build_digest has_activity True when there is activity") check(d["user_count"] == 2 and d["email_count"] == 7 and d["investor_count"] == 6, f"counts: 2 users / 7 emails / 6 investors, got {d['user_count']}/{d['email_count']}/{d['investor_count']}") body = d["body"] check("Daily Activity Digest" in body, "body has title") check("BY TEAM MEMBER" in body and "BY INVESTOR" in body, "body has both sections") check("Grant Gilliam" in body and "Jonathan K" in body, "body names both active users") check("Harbor & Vine" in body and "Brightwater Capital" in body and "Vela Partners" in body, "investor section lists investors") check("Grant worked with Harbor & Vine on Fund III." in body, "body includes the local narrative") empty = digest_builder.build_digest(conn, "2030-01-01T00:00:00Z", "2030-01-02T00:00:00Z", chat_fn=stub) check(empty["has_activity"] is False, "empty window -> has_activity False") check("No tracked email activity" in empty["body"], "empty window -> 'no activity' note (always-send)") check("BY INVESTOR" not in empty["body"], "empty window -> no section headers") conn.close() def test_policy(): conn = _conn() # No DB row yet: CRM_DIGEST_ENABLED=1 (set at import) seeds enabled; hour defaults 18. pol = digest_builder.load_digest_policy(conn) check(pol["enabled"] is True and pol["send_hour"] == 18, f"env seed -> enabled, hour 18, got {pol}") # A DB row wins over the env seed (the admin-panel control). conn.execute("INSERT OR REPLACE INTO app_settings (key,value_json,updated_at) VALUES (?,?,?)", (digest_builder.DIGEST_POLICY_KEY, json.dumps({"enabled": False, "send_hour": 9}), "x")) conn.commit() pol2 = digest_builder.load_digest_policy(conn) check(pol2["enabled"] is False and pol2["send_hour"] == 9, f"DB policy wins over env, got {pol2}") conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,)) conn.commit() conn.close() def test_summary_fallback(): grp = {"user_id": "u1", "full_name": "Grant Gilliam", "username": "grant", "emails": [{"direction": "sent", "subject": "x", "investors": ["Harbor & Vine"], "text": "hi"}], "investors": ["Harbor & Vine"], "sent": 1, "received": 0, "total": 1} def boom(*a, **k): raise RuntimeError("spark down") out = digest_builder.summarize_user_day(grp, chat_fn=boom) check("Grant Gilliam" in out and "1 sent" in out and "unavailable" in out.lower(), f"fallback narrative on chat error, got: {out}") def test_scheduler_guards(): sent_calls = [] build_fn = lambda conn, since, until: {"subject": "S", "body": "B", "has_activity": True, "user_count": 1, "email_count": 2} def send_fn(conn, to_addrs, subject, body, sender_email=None): sent_calls.append(list(to_addrs)) return {"transport": "stub"} factory = _conn utc = datetime(2026, 6, 18, 1, 0, tzinfo=timezone.utc) # Before the send hour (09:00 local < 18:00) -> no send r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 9, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "before_send_hour" and not sent_calls, f"before send hour -> skip, got {r}") # At/after the send hour -> sends once, only to the active admin (grant) r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 19, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "sent" and len(sent_calls) == 1, f"after send hour -> sends, got {r}") check(sent_calls and sent_calls[-1] == ["grant@ten31.xyz"], f"recipients = active admins only, got {sent_calls[-1]}") # The window cursor must advance to the send time so a missed day rolls forward # (since, now] — not be left unset/stale. conn = _conn() cursor_at = digest_scheduler._get_setting(conn, digest_scheduler._LAST_AT_KEY) conn.close() check(cursor_at == digest_scheduler._utc_iso(utc), f"window cursor advanced to send time, got {cursor_at}") # Same local day again -> suppressed (once-per-day guard) r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 20, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "already_sent_today" and len(sent_calls) == 1, f"second send same day -> skip, got {r}") # force=True ignores the hour + once-per-day guard, and does NOT touch the cursor conn = _conn() before = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY) conn.close() r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 18, 3, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "sent" and len(sent_calls) == 2, f"force sends regardless of guards, got {r}") conn = _conn() after = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY) conn.close() check(before == after, "force send does not advance the daily cursor") # DB policy disabled -> daily path skips even past the hour; force still sends. conn = _conn() digest_scheduler._set_setting(conn, digest_builder.DIGEST_POLICY_KEY, {"enabled": False, "send_hour": 18}) conn.commit() conn.close() r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 19, 19, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "disabled" and len(sent_calls) == 2, f"DB-disabled policy skips daily send, got {r}") r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 19, 2, 0), now_utc=utc, build_fn=build_fn, send_fn=send_fn) check(r["status"] == "sent" and len(sent_calls) == 3, f"force overrides disabled policy, got {r}") conn = _conn() conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,)) conn.commit() conn.close() def test_window_resolver(): from datetime import timedelta nu = datetime(2026, 6, 16, 15, 0, tzinfo=timezone.utc) nl = datetime(2026, 6, 16, 8, 0, tzinfo=timezone(timedelta(hours=-7))) # PDT s, u = digest_builder.resolve_digest_window(now_utc=nu, now_local=nl) check((s, u) == ("2026-06-15T15:00:00Z", "2026-06-16T15:00:00Z"), f"default = last 24h, got {(s,u)}") s, u = digest_builder.resolve_digest_window(hours=48, now_utc=nu, now_local=nl) check(s == "2026-06-14T15:00:00Z", f"hours=48 lookback, got {s}") # since = a local calendar date -> that day's LOCAL midnight, expressed in UTC s, u = digest_builder.resolve_digest_window(since="2026-06-01", now_utc=nu, now_local=nl) check(s == "2026-06-01T07:00:00Z", f"since-date -> local midnight in UTC, got {s}") # a since older than the cap clamps to MAX_WINDOW_DAYS (response echoes real window) s, u = digest_builder.resolve_digest_window(since="2025-01-01", now_utc=nu, now_local=nl) check(s == (nu - timedelta(days=digest_builder.MAX_WINDOW_DAYS)).strftime("%Y-%m-%dT%H:%M:%SZ"), f"over-cap since clamps to {digest_builder.MAX_WINDOW_DAYS}d, got {s}") # since wins over hours when both supplied s, u = digest_builder.resolve_digest_window(hours=1, since="2026-06-10", now_utc=nu, now_local=nl) check(s.startswith("2026-06-10"), f"since wins over hours, got {s}") # same-day boundary: since = today's local date, now later in the day -> valid # window (local midnight is strictly before now), not a "start must be before now" raise s, u = digest_builder.resolve_digest_window(since="2026-06-16", now_utc=nu, now_local=nl) check(s == "2026-06-16T07:00:00Z" and u == "2026-06-16T15:00:00Z", f"since=today -> [local midnight, now], got {(s, u)}") for bad in [dict(hours=0), dict(hours="abc"), dict(since="nope"), dict(since="2027-01-01")]: try: digest_builder.resolve_digest_window(now_utc=nu, now_local=nl, **bad) check(False, f"bad input {bad} should raise") except ValueError: check(True, f"bad input rejected: {bad}") def test_send_digest_window(): sent = [] build_fn = lambda conn, since, until: {"subject": "S", "body": f"{since}|{until}", "has_activity": True, "user_count": 1, "email_count": 2, "investor_count": 1} def send_fn(conn, to_addrs, subject, body, sender_email=None): sent.append((list(to_addrs), body)) return {"transport": "stub"} conn = _conn() before = digest_scheduler._get_setting(conn, digest_scheduler._LAST_AT_KEY) conn.close() r = digest_scheduler.send_digest_window(_conn, since_iso="2026-05-01T00:00:00Z", until_iso="2026-06-16T00:00:00Z", build_fn=build_fn, send_fn=send_fn) check(r["status"] == "sent" and r["window"] == ["2026-05-01T00:00:00Z", "2026-06-16T00:00:00Z"], f"windowed send returns its window, got {r}") check(sent and sent[-1][0] == ["grant@ten31.xyz"], f"windowed send -> active admins only, got {sent}") conn = _conn() after = digest_scheduler._get_setting(conn, digest_scheduler._LAST_AT_KEY) conn.close() check(before == after, "windowed manual send does not advance the daily cursor") def main(): setup() print("collect_user_activity:"); test_collect() print("collect_investor_activity:"); test_investor() print("build_digest + empty:"); test_build_and_empty() print("summary fallback:"); test_summary_fallback() print("digest policy:"); test_policy() print("window resolver:"); test_window_resolver() print("windowed manual send:"); test_send_digest_window() print("scheduler guards:"); test_scheduler_guards() if FAILS: print(f"\nFAILED ({len(FAILS)})") for f in FAILS: print(" - " + f) sys.exit(1) print("\nALL PASS (digest builder + scheduler)") if __name__ == "__main__": main()