Add daily activity digest — Phase B (v0.1.0:77)

Sends a once-a-day internal email to all active admins summarizing each team
member's email activity per investor, plus a team-wide by-investor view
(inbound + outbound, deduped). Narratives are generated on the LOCAL Spark
model, never Claude — the digest is intentionally un-anonymized, so substance
stays on Ten31 infra. This is an internal ops email, exempt from the
'agents draft, humans send' rule (which governs outward LP contact).

- backend/digest_builder.py: per-user + per-investor activity queries
  (soft-delete filtered), per-user Spark narrative with a deterministic
  fallback, two-section plain-text body, and the DB-backed policy resolver.
- backend/email_integration/digest_scheduler.py: always-on daily thread that
  re-reads the policy each cycle and sends once/day; window cursor in
  app_settings so a missed day rolls forward.
- server.py: POST /api/admin/digest/send-now and GET/PATCH
  /api/admin/digest/policy; scheduler wired into main().
- Control lives in Settings -> Admin (enable toggle + send-time dropdown),
  not StartOS actions; env vars only seed the first-boot default.
- Tests: backend/test_digest_builder.py.
This commit is contained in:
Keysat
2026-06-15 22:32:27 -05:00
parent 036226ed74
commit 323f016f64
12 changed files with 1113 additions and 19 deletions
+369
View File
@@ -0,0 +1,369 @@
"""Daily activity digest — content builder (Phase B).
Assembles the per-user -> per-investor email-activity digest and summarizes each
team member's day with ONE narrative paragraph from the LOCAL Spark model
(ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately
un-anonymized (real LP names + email substance), so every summarization stays on
Ten31 infra. Keeping the substance local is the whole point — this is the one
path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary.
This is an internal ops email to the team's own admins, so it is exempt from the
"agents draft, humans send" rule — that rule governs outward LP/prospect contact,
not an internal digest to the fund's own inboxes. Never extend it to send to LPs.
Soft-delete: every read here filters the relevant tombstones —
`email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the
org/contact name joins drop soft-deleted rows (falling back to the raw address).
Stdlib only; the local-LLM client is imported lazily so this module stays
importable (and testable with an injected chat fn) without Spark configured.
"""
import json
import os
import sqlite3
from datetime import datetime, timezone
# One row per (account-sighting x investor-link) in the window. Grouped into
# per-user buckets in Python. Investor display name resolves fundraising grid ->
# organization -> contact -> the raw matched address, skipping soft-deleted
# org/contact rows (fundraising_investors has no soft-delete column — it is a
# rebuilt projection of the grid).
_ACTIVITY_SQL = """
SELECT
ea.user_id AS user_id,
u.username AS username,
u.full_name AS full_name,
ea.email_address AS account_email,
eam.is_sent AS is_sent,
e.id AS email_id,
e.from_email AS from_email,
e.subject AS subject,
e.body_text AS body_text,
e.snippet AS snippet,
e.sent_at AS sent_at,
COALESCE(
NULLIF(TRIM(fi.investor_name), ''),
NULLIF(TRIM(o.name), ''),
NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''),
eil.matched_address
) AS investor_name
FROM email_account_messages eam
JOIN email_accounts ea ON ea.id = eam.account_id
JOIN users u ON u.id = ea.user_id
JOIN emails e ON e.id = eam.email_id
JOIN email_investor_links eil ON eil.email_id = e.id
LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id
LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL
LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL
WHERE eam.deleted_at IS NULL
AND u.is_active = 1
AND e.is_matched = 1
AND e.sent_at >= ? AND e.sent_at < ?
ORDER BY u.full_name, u.username, e.sent_at ASC
"""
_RULE = "" * 52
_FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), "
"never Claude. This mailbox is unmonitored.")
_SYSTEM = (
"You write a brief internal activity digest for a venture fund's partners. "
"Given one team member's emails with investors over the last day, write 2-4 "
"sentences summarizing what they did: which investors they engaged and the "
"gist of each thread. Name the investors. Past tense, plain prose, no "
"greeting, no bullet points, no sign-off."
)
# ------------------------------------------------------------------ collection
def _fetch_activity_rows(conn, since_iso, until_iso):
"""Raw (sighting x investor-link) rows for the window. [] if email tables absent."""
try:
return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall()
except sqlite3.OperationalError:
return [] # email tables not present (integration disabled) — nothing to report
def _own_addresses(conn):
"""Lower-cased set of enrolled mailbox addresses — used to decide whether an
email is outbound (from us) or inbound (from the investor) at the email level."""
try:
return {(r[0] or "").lower().strip()
for r in conn.execute("SELECT email_address FROM email_accounts")}
except sqlite3.OperationalError:
return set()
def collect_user_activity(conn, since_iso, until_iso):
"""Return per-user activity buckets for emails in [since_iso, until_iso).
Each bucket: {user_id, username, full_name, account_email, emails[], investors[],
sent, received, total}. Empty list if the email tables are absent. Only users
who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS
user send the message."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
groups = {}
for r in rows:
uid = r["user_id"]
g = groups.get(uid)
if g is None:
g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"],
"account_email": r["account_email"], "_emails": {}, "_inv": set()}
groups[uid] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
em = {"email_id": eid,
"direction": "sent" if r["is_sent"] else "received",
"subject": r["subject"], "sent_at": r["sent_at"],
"text": r["body_text"] or r["snippet"] or "", "investors": []}
g["_emails"][eid] = em
inv = (r["investor_name"] or "").strip()
if inv:
if inv not in em["investors"]:
em["investors"].append(inv)
g["_inv"].add(inv)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
sent = sum(1 for e in emails if e["direction"] == "sent")
out.append({
"user_id": g["user_id"], "username": g["username"],
"full_name": g["full_name"], "account_email": g["account_email"],
"emails": emails, "investors": sorted(g["_inv"]),
"sent": sent, "received": len(emails) - sent, "total": len(emails),
})
out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower())
return out
def collect_investor_activity(conn, since_iso, until_iso):
"""Re-pivot the same window by investor (across the whole team), deduping each
email so a reply to several team members counts once. Direction is decided at
the EMAIL level: outbound if the sender is one of our mailboxes, else inbound.
Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at,
members[]}], inbound, outbound, total}. Sorted most-active first."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
own = _own_addresses(conn)
groups = {}
for r in rows:
name = (r["investor_name"] or "").strip() or "(unmatched)"
g = groups.get(name)
if g is None:
g = {"name": name, "_emails": {}}
groups[name] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
outbound = (r["from_email"] or "").lower().strip() in own
em = {"email_id": eid, "direction": "out" if outbound else "in",
"subject": r["subject"], "sent_at": r["sent_at"], "members": []}
g["_emails"][eid] = em
# Attribute the sending team member on outbound mail (the sighting with
# is_sent=1); inbound is "from them", so no member shown.
if em["direction"] == "out" and r["is_sent"]:
who = (r["full_name"] or r["username"] or "").strip()
if who and who not in em["members"]:
em["members"].append(who)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
inbound = sum(1 for e in emails if e["direction"] == "in")
out.append({"name": g["name"], "emails": emails,
"inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)})
out.sort(key=lambda x: (-x["total"], x["name"].lower()))
return out
# ------------------------------------------------------------------ policy
DIGEST_POLICY_KEY = "digest_policy"
DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18}
def load_digest_policy(conn):
"""Resolve the live digest policy. Precedence: the app_settings DB row (the
admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env
vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns
{enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the
scheduler so both read one source of truth."""
pol = dict(DEFAULT_DIGEST_POLICY)
env_enabled = os.environ.get("CRM_DIGEST_ENABLED")
if env_enabled is not None:
pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on")
env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR")
if env_hour:
try:
pol["send_hour"] = min(23, max(0, int(env_hour)))
except ValueError:
pass
try:
row = conn.execute(
"SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone()
except sqlite3.OperationalError:
row = None
if row:
try:
saved = json.loads(row["value_json"])
except Exception:
saved = None
if isinstance(saved, dict):
if "enabled" in saved:
pol["enabled"] = bool(saved["enabled"])
if "send_hour" in saved:
try:
pol["send_hour"] = min(23, max(0, int(saved["send_hour"])))
except (ValueError, TypeError):
pass
return pol
# ------------------------------------------------------------------ summarization
def _default_chat(prompt, system=None, max_tokens=220):
"""Lazily reach the local Qwen chat via Spark Control (ingest/llm.py)."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest"))
import llm # noqa: E402
return llm.chat(prompt, system=system, max_tokens=max_tokens)
def _user_email_block(group, max_emails=20, body_chars=500):
lines = []
for em in group["emails"][:max_emails]:
invs = ", ".join(em["investors"]) or "(unmatched)"
body = " ".join((em.get("text") or "").split())[:body_chars]
line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}"
if body:
line += f" | {body}"
lines.append(line)
return "\n".join(lines)
def _fallback_narrative(group):
"""Deterministic summary when the local model is unavailable — the digest
must still send (always-send) with real counts rather than fail."""
name = group.get("full_name") or group.get("username") or "Team member"
invs = ", ".join(group["investors"]) or "no matched investors"
return (f"{name} had {group['total']} email(s) "
f"({group['sent']} sent, {group['received']} received) with {invs}. "
"(Local summary unavailable.)")
def summarize_user_day(group, chat_fn=None):
"""One narrative paragraph for a user's day, from the local model. Falls back
to a deterministic count summary on any error or empty reply."""
fn = chat_fn or _default_chat
name = group.get("full_name") or group.get("username") or "The team member"
prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}"
try:
out = fn(prompt, system=_SYSTEM, max_tokens=220)
out = " ".join((out or "").split()).strip()
if out:
return out
except Exception:
pass
return _fallback_narrative(group)
# ------------------------------------------------------------------ composition
def _parse_iso(iso):
if not iso:
return None
s = str(iso).strip()
for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def _fmt_local(iso):
"""UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting
to stay portable (no platform-specific %-I)."""
dt = _parse_iso(iso)
if dt is None:
return str(iso)
dt = dt.astimezone()
hour12 = dt.hour % 12 or 12
ampm = "AM" if dt.hour < 12 else "PM"
return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}"
def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso):
title_date = datetime.now().astimezone().strftime("%A, %b %d %Y")
window = f"{_fmt_local(since_iso)} {_fmt_local(until_iso)}"
L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""]
if not user_groups:
L += ["No tracked email activity from any user in this window.", "", _FOOTER]
return "\n".join(L)
total_emails = sum(g["total"] for g in user_groups)
total_invs = len({i for g in user_groups for i in g["investors"]})
L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) "
f"· {total_invs} investor(s)")
# ── Section 1: by team member (who did what; per-user Spark narrative) ──
L += ["", _RULE, "BY TEAM MEMBER", _RULE]
for g in user_groups:
invs = ", ".join(g["investors"]) or "(no matched investor)"
L += ["",
f"{g['full_name'] or g['username']} · {g['account_email']}",
f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) "
f"· {invs}", "",
narratives.get(g["user_id"], ""), ""]
for em in g["emails"]:
arrow = "→ Sent" if em["direction"] == "sent" else "← Received"
invs_e = ", ".join(em["investors"]) or "(unmatched)"
subj = em.get("subject") or "(no subject)"
L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})")
# ── Section 2: by investor (team-wide; both directions, structured) ──
L += ["", _RULE, "BY INVESTOR", _RULE]
for inv in investor_groups:
L += ["",
f"{inv['name']} · {inv['total']} email(s) "
f"({inv['inbound']} in, {inv['outbound']} out)"]
for em in inv["emails"]:
subj = em.get("subject") or "(no subject)"
when = _fmt_local(em["sent_at"])
if em["direction"] == "out":
who = ", ".join(em["members"]) or "team"
L.append(f" → Sent by {who} · \"{subj}\" ({when})")
else:
L.append(f" ← Received · \"{subj}\" ({when})")
L += ["", _RULE, _FOOTER]
return "\n".join(L)
def build_digest(conn, since_iso, until_iso, chat_fn=None):
"""Build the daily digest for [since_iso, until_iso). Returns
{subject, body, has_activity, user_count, email_count, investor_count}. Always
returns a body (empty windows get a 'no activity' note — the team chose
always-send). Two sections: by team member (per-user Spark narrative) and by
investor (structured, both directions)."""
user_groups = collect_user_activity(conn, since_iso, until_iso)
investor_groups = collect_investor_activity(conn, since_iso, until_iso)
narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups}
body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso)
stamp = datetime.now().astimezone().strftime("%b %d")
return {
"subject": f"Ten31 CRM — Daily Activity Digest · {stamp}",
"body": body,
"has_activity": bool(user_groups),
"user_count": len(user_groups),
"email_count": sum(g["total"] for g in user_groups),
"investor_count": len(investor_groups),
}
@@ -0,0 +1,185 @@
"""Daily activity-digest scheduler (Phase B).
Co-located with the Gmail sync scheduler (it shares the same conn-factory and
daemon-thread idiom). One daemon thread wakes every 60s and fires the daily
activity digest once per local day, at/after the configured send hour.
Control lives in the DB, set from Settings -> Admin (digest_builder.load_digest_policy
-> app_settings 'digest_policy'): {enabled, send_hour}. The thread always runs and
re-reads the policy each cycle, so toggling the digest on/off or changing the time
takes effect on the next loop — no restart. CRM_DIGEST_ENABLED/SEND_HOUR only seed
the first-boot default before an admin sets the policy.
The send is an internal ops email to the team's own admins — exempt from the
"agents draft, humans send" rule (which governs outward LP/prospect contact).
Digest content is summarized on Spark (local), never Claude — see digest_builder.
Window: the content covers (last successful send, now]. Tracked in app_settings
so a missed day's activity rolls into the next digest rather than being dropped;
the first-ever run covers the prior 24h. The once-per-day guard is a separate
local-date key. The transport (Gmail-DWD -> SMTP) is digest_mailer's job.
"""
import json
import logging
import sqlite3
import threading
from datetime import datetime, timedelta, timezone
from .scheduler import _conn_factory_from_env
log = logging.getLogger("email_integration.digest_scheduler")
_LAST_DATE_KEY = "digest_last_sent_date" # local YYYY-MM-DD — once-per-day guard
_LAST_AT_KEY = "digest_last_sent_at" # UTC ISO — content-window cursor
_state: dict[str, object] = {"thread": None, "stop": threading.Event()}
def _utc_iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# app_settings access kept local (no server.py import — avoid the startup cycle);
# the value_json/JSON encoding matches server.get/set_app_setting exactly.
def _get_setting(conn, key):
try:
row = conn.execute("SELECT value_json FROM app_settings WHERE key = ?", (key,)).fetchone()
except sqlite3.OperationalError:
return None
if not row:
return None
try:
return json.loads(row["value_json"])
except Exception:
return None
def _set_setting(conn, key, value) -> None:
conn.execute(
"INSERT INTO app_settings (key, value_json, updated_at) VALUES (?, ?, ?) "
"ON CONFLICT(key) DO UPDATE SET value_json = excluded.value_json, "
"updated_at = excluded.updated_at",
(key, json.dumps(value), _utc_iso(datetime.now(timezone.utc))),
)
def _admin_recipients(conn) -> list[str]:
rows = conn.execute(
"SELECT email FROM users WHERE role = 'admin' AND is_active = 1 "
"AND email IS NOT NULL AND TRIM(email) != ''"
).fetchall()
return [str(r["email"]).strip() for r in rows if str(r["email"]).strip()]
def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None):
"""Build the digest and hand it to the transport. Raises digest_mailer.NoTransport
(no transport / no recipient) — callers map that to a clear 400; the daily loop
logs it. build_fn/send_fn are injectable for tests."""
import digest_builder
import digest_mailer
bf = build_fn or digest_builder.build_digest
sf = send_fn or digest_mailer.send_digest
recipients = _admin_recipients(conn)
if not recipients:
raise digest_mailer.NoTransport(
"No active admin has an email address — give one an address to receive the digest.")
digest = bf(conn, since_iso, until_iso)
result = sf(conn, recipients, digest["subject"], digest["body"])
return {
"recipients": recipients,
"transport": (result or {}).get("transport"),
"has_activity": digest["has_activity"],
"user_count": digest["user_count"],
"email_count": digest["email_count"],
"investor_count": digest.get("investor_count"),
"window": [since_iso, until_iso],
}
def maybe_send_digest(conn_factory=None, *, force=False,
now_local=None, now_utc=None, build_fn=None, send_fn=None):
"""Send the daily digest if it is due (or unconditionally when force=True).
Daily path: skips before the send hour and if already sent today; content
window runs from the last send to now and the cursor advances on success.
force path (the admin 'send now' endpoint): ignores the policy and the guards,
uses a fixed last-24h window, and does NOT advance the daily cursor — so an
on-demand preview never suppresses the scheduled send."""
import digest_builder
factory = conn_factory or _conn_factory_from_env()
conn = factory()
try:
policy = digest_builder.load_digest_policy(conn)
if not force and not policy["enabled"]:
return {"status": "disabled"}
nl = now_local or datetime.now()
nu = now_utc or datetime.now(timezone.utc)
if not force:
today = nl.strftime("%Y-%m-%d")
if nl.hour < policy["send_hour"]:
return {"status": "before_send_hour", "send_hour": policy["send_hour"]}
if _get_setting(conn, _LAST_DATE_KEY) == today:
return {"status": "already_sent_today"}
until_iso = _utc_iso(nu)
last_at = None if force else _get_setting(conn, _LAST_AT_KEY)
since_iso = last_at or _utc_iso(nu - timedelta(hours=24))
result = _build_and_send(conn, since_iso, until_iso, build_fn=build_fn, send_fn=send_fn)
if not force:
_set_setting(conn, _LAST_DATE_KEY, nl.strftime("%Y-%m-%d"))
_set_setting(conn, _LAST_AT_KEY, until_iso)
conn.commit()
return {"status": "sent", **result}
finally:
conn.close()
def start_digest_scheduler(conn_factory=None) -> None:
"""Start the daily digest loop (idempotent). The thread always runs and reads
the DB policy each cycle (admin-panel control), so it sends only when the policy
is enabled — no env gate, no restart needed to toggle."""
if _state["thread"] is not None:
return
factory = conn_factory or _conn_factory_from_env()
stop = threading.Event()
_state["stop"] = stop
def _loop():
log.info("digest scheduler started (policy-controlled via Settings -> Admin)")
if stop.wait(15): # let server finish startup
return
while not stop.is_set():
try:
res = maybe_send_digest(factory)
if res.get("status") == "sent":
log.info("daily digest sent: %s", res)
except Exception:
log.exception("digest send failed; will retry next cycle")
if stop.wait(60):
return
t = threading.Thread(target=_loop, name="digest", daemon=True)
t.start()
_state["thread"] = t
def stop_digest_scheduler() -> None:
ev: threading.Event = _state["stop"] # type: ignore
ev.set()
t = _state.get("thread")
if t:
try:
t.join(timeout=5) # type: ignore
except Exception:
pass
_state["thread"] = None
+73
View File
@@ -1817,6 +1817,8 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.handle_list_fundraising_backups(user)
if path == '/api/fundraising/backup-policy':
return self.handle_get_backup_policy(user)
if path == '/api/admin/digest/policy':
return self.handle_get_digest_policy(user)
if path == '/api/fundraising/relational-summary':
return self.handle_get_fundraising_relational_summary(user)
if path == '/api/fundraising/automations':
@@ -1921,6 +1923,8 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.handle_admin_reset_all_data(user, body)
if path == '/api/admin/digest/test-email':
return self.handle_admin_send_test_email(user, body)
if path == '/api/admin/digest/send-now':
return self.handle_admin_send_digest_now(user, body)
if path == '/api/fundraising/backup':
return self.handle_backup_fundraising_state(user)
if path == '/api/fundraising/restore-preview':
@@ -2019,6 +2023,8 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.handle_admin_update_user(user, target_user_id, body)
if path == '/api/fundraising/backup-policy':
return self.handle_update_backup_policy(user, body)
if path == '/api/admin/digest/policy':
return self.handle_update_digest_policy(user, body)
if re.match(r'^/api/fundraising/automations/[^/]+$', path):
rule_id = path.split('/')[-1]
return self.handle_update_fundraising_automation_rule(user, rule_id, body)
@@ -4292,6 +4298,63 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.send_json({"data": {"status": "sent", **result}})
def handle_admin_send_digest_now(self, user, body):
"""Build the REAL daily activity digest (last 24h) on demand and send it to
the active-admin set now. An on-demand preview of Phase B — it does not
touch the daily schedule's cursor, so it never suppresses the scheduled send.
Content is summarized on Spark (local), never Claude."""
if not require_admin(user):
return self.send_error_json("Admin only", 403)
import digest_mailer
try:
from email_integration.digest_scheduler import maybe_send_digest
result = maybe_send_digest(force=True)
except digest_mailer.NoTransport as exc:
return self.send_error_json(str(exc), 400)
except Exception as exc:
# Never echo the exception — an auth error can carry a token/credential.
print(f"[digest] send-now failed: {type(exc).__name__}: {exc}", file=sys.stderr)
return self.send_error_json("Send failed — see server logs for details.", 502)
return self.send_json({"data": result})
def handle_get_digest_policy(self, user):
"""Return the live daily-digest policy (enabled + send hour). DB-backed
(app_settings), set from this same panel — see digest_builder.load_digest_policy."""
if not require_admin(user):
return self.send_error_json("Admin access required", 403)
import digest_builder
conn = get_db()
try:
return self.send_json({"data": digest_builder.load_digest_policy(conn)})
finally:
conn.close()
def handle_update_digest_policy(self, user, body):
"""Update the daily-digest policy. Takes effect on the scheduler's next
cycle (no restart). Recipients stay the active-admin set; sender/transport
are env/StartOS config, not toggled here."""
if not require_admin(user):
return self.send_error_json("Admin access required", 403)
import digest_builder
conn = get_db()
try:
policy = digest_builder.load_digest_policy(conn)
if 'enabled' in body:
policy['enabled'] = bool(body.get('enabled'))
if 'send_hour' in body:
try:
policy['send_hour'] = max(0, min(23, int(body.get('send_hour'))))
except (ValueError, TypeError):
return self.send_error_json("send_hour must be an integer from 0 to 23")
normalized = {"enabled": bool(policy['enabled']), "send_hour": int(policy['send_hour'])}
set_app_setting(conn, digest_builder.DIGEST_POLICY_KEY, normalized)
conn.commit()
return self.send_json({"data": normalized})
finally:
conn.close()
def handle_list_audit_log(self, user, params):
if not require_admin(user):
return self.send_error_json("Admin access required", 403)
@@ -5425,6 +5488,16 @@ def main():
except Exception as _e:
print(f"[email_integration] failed to start scheduler: {_e}")
# ─── Daily activity digest scheduler ─────────────────────────────
# Always started; it reads the digest policy (enabled + send hour) from the DB
# each cycle, so the Settings → Admin toggle controls it live (no restart).
try:
from email_integration.digest_scheduler import start_digest_scheduler
start_digest_scheduler()
print("[digest] daily activity digest scheduler started (policy-controlled)")
except Exception as _e:
print(f"[digest] failed to start digest scheduler: {_e}")
# ThreadingHTTPServer lets one slow request (or a wave of scanner probes)
# not block legit users. SQLite is opened per-request via get_db(), and
# WAL mode allows concurrent readers + a single writer, so this is safe.
+303
View File
@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""Tests for the daily activity digest (Phase B): the per-user + per-investor
activity queries (soft-delete filtered), inbound dedup, the two-section body, the
local-summary fallback, the DB-backed policy resolver, and the scheduler's
once-per-day / send-hour / policy / force guards.
The local Spark model and the mail transport are stubbed — no network. Synthetic
data only (guardrail #9).
Run: cd backend && python3 test_digest_builder.py
"""
import json
import os
import sqlite3
import sys
import tempfile
from datetime import datetime, timezone
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
os.environ["CRM_DB_PATH"] = os.path.join(tempfile.mkdtemp(), "crm.db")
os.environ.setdefault("CRM_DATA_DIR", os.path.dirname(os.environ["CRM_DB_PATH"]))
os.environ["CRM_DIGEST_ENABLED"] = "1" # so the non-force scheduler path is live
import digest_builder # noqa: E402
from email_integration import digest_scheduler # noqa: E402
FAILS = []
SINCE = "2026-06-17T00:00:00Z"
UNTIL = "2026-06-18T00:00:00Z"
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def _conn():
conn = sqlite3.connect(os.environ["CRM_DB_PATH"])
conn.row_factory = sqlite3.Row
return conn
def setup():
conn = _conn()
conn.executescript("""
CREATE TABLE users (id TEXT PRIMARY KEY, username TEXT, full_name TEXT,
email TEXT, role TEXT, is_active INT DEFAULT 1);
CREATE TABLE email_accounts (id TEXT PRIMARY KEY, user_id TEXT, email_address TEXT);
CREATE TABLE emails (id TEXT PRIMARY KEY, subject TEXT, body_text TEXT, snippet TEXT,
from_email TEXT, sent_at TEXT, is_matched INT DEFAULT 1);
CREATE TABLE email_account_messages (id TEXT PRIMARY KEY, email_id TEXT, account_id TEXT,
gmail_message_id TEXT, gmail_thread_id TEXT, is_sent INT DEFAULT 0, deleted_at TEXT);
CREATE TABLE email_investor_links (id TEXT PRIMARY KEY, email_id TEXT,
fundraising_investor_id TEXT, organization_id TEXT, contact_id TEXT, matched_address TEXT);
CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT);
CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, deleted_at TEXT);
CREATE TABLE contacts (id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT, deleted_at TEXT);
CREATE TABLE app_settings (key TEXT PRIMARY KEY, value_json TEXT, updated_at TEXT);
""")
conn.executemany("INSERT INTO users (id,username,full_name,email,role,is_active) VALUES (?,?,?,?,?,?)", [
("u1", "grant", "Grant Gilliam", "grant@ten31.xyz", "admin", 1),
("u2", "jk", "Jonathan K", "jk@ten31.xyz", "member", 1),
("u3", "retired", "Old Admin", "old@ten31.xyz", "admin", 0), # inactive -> excluded
])
conn.executemany("INSERT INTO email_accounts (id,user_id,email_address) VALUES (?,?,?)", [
("a1", "u1", "grant@ten31.xyz"), ("a2", "u2", "jk@ten31.xyz"), ("a3", "u3", "old@ten31.xyz"),
])
conn.executemany("INSERT INTO fundraising_investors (id,investor_name) VALUES (?,?)", [
("inv1", "Harbor & Vine"), ("inv2", "Brightwater Capital"), ("inv3", "Vela Partners"),
])
conn.executemany("INSERT INTO organizations (id,name,deleted_at) VALUES (?,?,?)", [
("org1", "Summit Fund", None), ("org2", "Deleted Org", "2026-06-01T00:00:00Z"),
])
conn.executemany("INSERT INTO contacts (id,first_name,last_name,deleted_at) VALUES (?,?,?,?)", [
("c1", "Jane", "Roe", None),
])
# emails: id, subject, body, from_email, sent_at, is_matched. Outbound = from one
# of our own mailboxes (grant@/jk@/old@ ten31.xyz); inbound = from outside.
conn.executemany("INSERT INTO emails (id,subject,body_text,from_email,sent_at,is_matched) VALUES (?,?,?,?,?,?)", [
("e1", "Fund III terms", "Discussing Fund III terms", "grant@ten31.xyz", "2026-06-17T14:00:00Z", 1),
("e2", "Re: allocation", "Question about allocation", "lp@brightwater.example", "2026-06-17T09:00:00Z", 1),
("e3", "Intro", "Summit intro thread", "jk@ten31.xyz", "2026-06-17T11:00:00Z", 1),
("e4", "Coffee", "Catch up note", "jane@roe.example", "2026-06-17T16:00:00Z", 1),
("e5", "Wire", "Wire instructions", "ir@summitcap.example", "2026-06-17T17:00:00Z", 1),
("e6", "Old", "ancient", "grant@ten31.xyz", "2026-06-10T10:00:00Z", 1), # out of window
("e7", "Tombstoned", "deleted sighting", "lp@harborvine.example", "2026-06-17T08:00:00Z", 1),
("e8", "Inactive", "from retired user", "old@ten31.xyz", "2026-06-17T12:00:00Z", 1),
("e9", "Unmatched", "not matched", "lp@harborvine.example", "2026-06-17T13:00:00Z", 0), # is_matched=0
("e10", "Group update", "inbound to two of us", "lp@vela.example", "2026-06-17T15:00:00Z", 1),
])
# sightings: id, email_id, account_id, is_sent, deleted_at
conn.executemany(
"INSERT INTO email_account_messages (id,email_id,account_id,gmail_message_id,gmail_thread_id,is_sent,deleted_at) "
"VALUES (?,?,?,?,?,?,?)", [
("s1", "e1", "a1", "g1", "t1", 1, None), # grant SENT
("s2", "e2", "a1", "g2", "t2", 0, None), # grant RECEIVED
("s3", "e3", "a2", "g3", "t3", 1, None), # jk SENT
("s4", "e4", "a1", "g4", "t4", 0, None), # grant RECEIVED (contact)
("s5", "e5", "a1", "g5", "t5", 0, None), # grant RECEIVED (deleted org)
("s6", "e6", "a1", "g6", "t6", 0, None), # out of window
("s7", "e7", "a1", "g7", "t7", 0, "2026-06-17T09:00:00Z"), # tombstoned
("s8", "e8", "a3", "g8", "t8", 1, None), # inactive user
("s9", "e9", "a1", "g9", "t9", 0, None), # unmatched email
("s10a", "e10", "a1", "g10a", "t10", 0, None), # e10 received by grant ...
("s10b", "e10", "a2", "g10b", "t10", 0, None), # ... and by jk (dedup target)
])
# investor links: id, email_id, fr_investor, org, contact, matched_address
conn.executemany(
"INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,organization_id,contact_id,matched_address) "
"VALUES (?,?,?,?,?,?)", [
("l1", "e1", "inv1", None, None, "lp@harborvine.example"),
("l2", "e2", "inv2", None, None, "lp@brightwater.example"),
("l3", "e3", None, "org1", None, "ir@summitfund.example"), # org name
("l4", "e4", None, None, "c1", "jane@roe.example"), # contact name
("l5", "e5", None, "org2", None, "ir@summitcap.example"), # deleted org -> address
("l6", "e6", "inv1", None, None, "lp@harborvine.example"),
("l7", "e7", "inv1", None, None, "lp@harborvine.example"),
("l8", "e8", "inv1", None, None, "lp@harborvine.example"),
("l9", "e9", "inv1", None, None, "lp@harborvine.example"),
("l10", "e10", "inv3", None, None, "lp@vela.example"),
])
conn.commit()
conn.close()
def test_collect():
conn = _conn()
groups = digest_builder.collect_user_activity(conn, SINCE, UNTIL)
conn.close()
check(len(groups) == 2, f"two active users with activity (grant, jk), got {len(groups)}")
by_user = {g["user_id"]: g for g in groups}
check("u3" not in by_user, "inactive user (u3) excluded")
grant = by_user.get("u1")
if not grant:
FAILS.append("grant group missing"); return
ids = set(e["email_id"] for e in grant["emails"])
check(ids == {"e1", "e2", "e4", "e5", "e10"},
f"grant has e1,e2,e4,e5,e10 (e6 out-of-window, e7 tombstoned, e9 unmatched excluded), got {sorted(ids)}")
check(grant["sent"] == 1 and grant["received"] == 4, f"grant 1 sent / 4 received, got {grant['sent']}/{grant['received']}")
e1 = next(e for e in grant["emails"] if e["email_id"] == "e1")
check(e1["direction"] == "sent", "e1 direction sent")
check(e1["investors"] == ["Harbor & Vine"], f"e1 investor = grid name, got {e1['investors']}")
e4 = next(e for e in grant["emails"] if e["email_id"] == "e4")
check(e4["investors"] == ["Jane Roe"], f"e4 investor = contact fallback name, got {e4['investors']}")
e5 = next(e for e in grant["emails"] if e["email_id"] == "e5")
check(e5["investors"] == ["ir@summitcap.example"], f"e5 investor = address (deleted org skipped), got {e5['investors']}")
jk = by_user.get("u2")
check(jk and jk["emails"][0]["investors"] == ["Summit Fund"], "jk e3 investor = org name")
def test_investor():
conn = _conn()
inv = digest_builder.collect_investor_activity(conn, SINCE, UNTIL)
conn.close()
by_name = {g["name"]: g for g in inv}
# Harbor & Vine, Brightwater, Vela Partners, Summit Fund, Jane Roe, ir@summitcap.example
check(len(inv) == 6, f"six investors with activity, got {len(inv)}: {sorted(by_name)}")
hv = by_name.get("Harbor & Vine")
check(hv and hv["outbound"] == 1 and hv["inbound"] == 0, f"Harbor & Vine = 1 out / 0 in, got {hv}")
check(hv and hv["emails"][0]["members"] == ["Grant Gilliam"], f"outbound attributed to sender, got {hv and hv['emails'][0]['members']}")
bw = by_name.get("Brightwater Capital")
check(bw and bw["inbound"] == 1 and bw["outbound"] == 0, f"Brightwater = 1 in / 0 out, got {bw}")
# e10 was received by TWO mailboxes (grant + jk) -> dedup to one inbound email
vela = by_name.get("Vela Partners")
check(vela and vela["total"] == 1 and vela["inbound"] == 1,
f"Vela inbound deduped across 2 sightings -> 1, got {vela}")
def test_build_and_empty():
conn = _conn()
stub = lambda prompt, system=None, max_tokens=220: "Grant worked with Harbor & Vine on Fund III."
d = digest_builder.build_digest(conn, SINCE, UNTIL, chat_fn=stub)
check(d["has_activity"] is True, "build_digest has_activity True when there is activity")
check(d["user_count"] == 2 and d["email_count"] == 7 and d["investor_count"] == 6,
f"counts: 2 users / 7 emails / 6 investors, got {d['user_count']}/{d['email_count']}/{d['investor_count']}")
body = d["body"]
check("Daily Activity Digest" in body, "body has title")
check("BY TEAM MEMBER" in body and "BY INVESTOR" in body, "body has both sections")
check("Grant Gilliam" in body and "Jonathan K" in body, "body names both active users")
check("Harbor & Vine" in body and "Brightwater Capital" in body and "Vela Partners" in body,
"investor section lists investors")
check("Grant worked with Harbor & Vine on Fund III." in body, "body includes the local narrative")
empty = digest_builder.build_digest(conn, "2030-01-01T00:00:00Z", "2030-01-02T00:00:00Z", chat_fn=stub)
check(empty["has_activity"] is False, "empty window -> has_activity False")
check("No tracked email activity" in empty["body"], "empty window -> 'no activity' note (always-send)")
check("BY INVESTOR" not in empty["body"], "empty window -> no section headers")
conn.close()
def test_policy():
conn = _conn()
# No DB row yet: CRM_DIGEST_ENABLED=1 (set at import) seeds enabled; hour defaults 18.
pol = digest_builder.load_digest_policy(conn)
check(pol["enabled"] is True and pol["send_hour"] == 18, f"env seed -> enabled, hour 18, got {pol}")
# A DB row wins over the env seed (the admin-panel control).
conn.execute("INSERT OR REPLACE INTO app_settings (key,value_json,updated_at) VALUES (?,?,?)",
(digest_builder.DIGEST_POLICY_KEY, json.dumps({"enabled": False, "send_hour": 9}), "x"))
conn.commit()
pol2 = digest_builder.load_digest_policy(conn)
check(pol2["enabled"] is False and pol2["send_hour"] == 9, f"DB policy wins over env, got {pol2}")
conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,))
conn.commit()
conn.close()
def test_summary_fallback():
grp = {"user_id": "u1", "full_name": "Grant Gilliam", "username": "grant",
"emails": [{"direction": "sent", "subject": "x", "investors": ["Harbor & Vine"], "text": "hi"}],
"investors": ["Harbor & Vine"], "sent": 1, "received": 0, "total": 1}
def boom(*a, **k):
raise RuntimeError("spark down")
out = digest_builder.summarize_user_day(grp, chat_fn=boom)
check("Grant Gilliam" in out and "1 sent" in out and "unavailable" in out.lower(),
f"fallback narrative on chat error, got: {out}")
def test_scheduler_guards():
sent_calls = []
build_fn = lambda conn, since, until: {"subject": "S", "body": "B",
"has_activity": True, "user_count": 1, "email_count": 2}
def send_fn(conn, to_addrs, subject, body, sender_email=None):
sent_calls.append(list(to_addrs))
return {"transport": "stub"}
factory = _conn
utc = datetime(2026, 6, 18, 1, 0, tzinfo=timezone.utc)
# Before the send hour (09:00 local < 18:00) -> no send
r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 9, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "before_send_hour" and not sent_calls, f"before send hour -> skip, got {r}")
# At/after the send hour -> sends once, only to the active admin (grant)
r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 19, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "sent" and len(sent_calls) == 1, f"after send hour -> sends, got {r}")
check(sent_calls and sent_calls[-1] == ["grant@ten31.xyz"], f"recipients = active admins only, got {sent_calls[-1]}")
# The window cursor must advance to the send time so a missed day rolls forward
# (since, now] — not be left unset/stale.
conn = _conn()
cursor_at = digest_scheduler._get_setting(conn, digest_scheduler._LAST_AT_KEY)
conn.close()
check(cursor_at == digest_scheduler._utc_iso(utc),
f"window cursor advanced to send time, got {cursor_at}")
# Same local day again -> suppressed (once-per-day guard)
r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 20, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "already_sent_today" and len(sent_calls) == 1, f"second send same day -> skip, got {r}")
# force=True ignores the hour + once-per-day guard, and does NOT touch the cursor
conn = _conn()
before = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY)
conn.close()
r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 18, 3, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "sent" and len(sent_calls) == 2, f"force sends regardless of guards, got {r}")
conn = _conn()
after = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY)
conn.close()
check(before == after, "force send does not advance the daily cursor")
# DB policy disabled -> daily path skips even past the hour; force still sends.
conn = _conn()
digest_scheduler._set_setting(conn, digest_builder.DIGEST_POLICY_KEY, {"enabled": False, "send_hour": 18})
conn.commit()
conn.close()
r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 19, 19, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "disabled" and len(sent_calls) == 2, f"DB-disabled policy skips daily send, got {r}")
r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 19, 2, 0),
now_utc=utc, build_fn=build_fn, send_fn=send_fn)
check(r["status"] == "sent" and len(sent_calls) == 3, f"force overrides disabled policy, got {r}")
conn = _conn()
conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,))
conn.commit()
conn.close()
def main():
setup()
print("collect_user_activity:"); test_collect()
print("collect_investor_activity:"); test_investor()
print("build_digest + empty:"); test_build_and_empty()
print("summary fallback:"); test_summary_fallback()
print("digest policy:"); test_policy()
print("scheduler guards:"); test_scheduler_guards()
if FAILS:
print(f"\nFAILED ({len(FAILS)})")
for f in FAILS:
print(" - " + f)
sys.exit(1)
print("\nALL PASS (digest builder + scheduler)")
if __name__ == "__main__":
main()