Files
ten31-database/backend/digest_builder.py
T
Keysat f181525926 Add reminders & follow-ups (W1) (v0.1.0:92)
First-class reminders tied to the fundraising grid — foundation of the agreed
reminders -> NL-search -> bot-mutations plan (keep LP data off third-party LLMs).

- reminders table (migration 0006; logical FK to fundraising_investors.id +
  denormalized name), CRUD at /api/reminders (soft-delete; open/done/snoozed/
  cancelled; assignee; source; source_row_id resolution)
- read-only derived reminder_status grid column (overdue/due_soon/open),
  filterable; orphan reconciler cancels reminders when an investor leaves the grid
- Reminders page, Dashboard "Reminders Due" card, daily-digest reminders section
- per-investor last_activity_at recency rollup (shared block for the W2 NL query)
- tests: test_reminders.py + digest reminders test (31/31 green, render-smoke green)
2026-06-18 14:45:46 -05:00

484 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daily activity digest — content builder (Phase B).
Assembles the per-user -> per-investor email-activity digest and summarizes each
team member's day with ONE narrative paragraph from the LOCAL Spark model
(ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately
un-anonymized (real LP names + email substance), so every summarization stays on
Ten31 infra. Keeping the substance local is the whole point — this is the one
path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary.
This is an internal ops email to the team's own admins, so it is exempt from the
"agents draft, humans send" rule — that rule governs outward LP/prospect contact,
not an internal digest to the fund's own inboxes. Never extend it to send to LPs.
Soft-delete: every read here filters the relevant tombstones —
`email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the
org/contact name joins drop soft-deleted rows (falling back to the raw address).
Stdlib only; the local-LLM client is imported lazily so this module stays
importable (and testable with an injected chat fn) without Spark configured.
"""
import json
import os
import sqlite3
from datetime import datetime, timedelta, timezone
# One row per (account-sighting x investor-link) in the window. Grouped into
# per-user buckets in Python. Investor display name resolves fundraising grid ->
# organization -> contact -> the raw matched address, skipping soft-deleted
# org/contact rows (fundraising_investors has no soft-delete column — it is a
# rebuilt projection of the grid).
_ACTIVITY_SQL = """
SELECT
ea.user_id AS user_id,
u.username AS username,
u.full_name AS full_name,
ea.email_address AS account_email,
eam.is_sent AS is_sent,
e.id AS email_id,
e.from_email AS from_email,
e.subject AS subject,
e.body_text AS body_text,
e.snippet AS snippet,
e.sent_at AS sent_at,
COALESCE(
NULLIF(TRIM(fi.investor_name), ''),
NULLIF(TRIM(o.name), ''),
NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''),
eil.matched_address
) AS investor_name
FROM email_account_messages eam
JOIN email_accounts ea ON ea.id = eam.account_id
JOIN users u ON u.id = ea.user_id
JOIN emails e ON e.id = eam.email_id
JOIN email_investor_links eil ON eil.email_id = e.id
LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id
LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL
LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL
WHERE eam.deleted_at IS NULL
AND u.is_active = 1
AND e.is_matched = 1
AND e.sent_at >= ? AND e.sent_at < ?
ORDER BY u.full_name, u.username, e.sent_at ASC
"""
_RULE = "" * 52
_FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), "
"never Claude. This mailbox is unmonitored.")
_SYSTEM = (
"You write a brief internal activity digest for a venture fund's partners. "
"Given one team member's emails with investors over the last day, write 2-4 "
"sentences summarizing what they did: which investors they engaged and the "
"gist of each thread. Name the investors. Past tense, plain prose, no "
"greeting, no bullet points, no sign-off."
)
# Reminders due is a current-state addendum (what needs action now), NOT bound to the
# email-activity window — a 6 PM digest should surface what's overdue / due today.
# status='open' only: a 'snoozed' reminder is an explicit mute, so it stays out of the
# digest by design (the quick-snooze UI keeps a reminder 'open' with a pushed-out date).
_REMINDERS_SQL = """
SELECT r.title AS title,
r.due_date AS due_date,
r.investor_name AS investor_name,
COALESCE(NULLIF(TRIM(u.full_name), ''), u.username) AS assignee
FROM reminders r
LEFT JOIN users u ON u.id = r.assignee_id
WHERE r.deleted_at IS NULL
AND r.status = 'open'
AND r.due_date IS NOT NULL AND TRIM(r.due_date) != ''
AND substr(r.due_date, 1, 10) <= ?
ORDER BY r.due_date ASC
"""
# ------------------------------------------------------------------ collection
def _fetch_activity_rows(conn, since_iso, until_iso):
"""Raw (sighting x investor-link) rows for the window. [] if email tables absent."""
try:
return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall()
except sqlite3.OperationalError:
return [] # email tables not present (integration disabled) — nothing to report
def _own_addresses(conn):
"""Lower-cased set of enrolled mailbox addresses — used to decide whether an
email is outbound (from us) or inbound (from the investor) at the email level."""
try:
return {(r[0] or "").lower().strip()
for r in conn.execute("SELECT email_address FROM email_accounts")}
except sqlite3.OperationalError:
return set()
def collect_user_activity(conn, since_iso, until_iso):
"""Return per-user activity buckets for emails in [since_iso, until_iso).
Each bucket: {user_id, username, full_name, account_email, emails[], investors[],
sent, received, total}. Empty list if the email tables are absent. Only users
who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS
user send the message."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
groups = {}
for r in rows:
uid = r["user_id"]
g = groups.get(uid)
if g is None:
g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"],
"account_email": r["account_email"], "_emails": {}, "_inv": set()}
groups[uid] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
em = {"email_id": eid,
"direction": "sent" if r["is_sent"] else "received",
"subject": r["subject"], "sent_at": r["sent_at"],
"text": r["body_text"] or r["snippet"] or "", "investors": []}
g["_emails"][eid] = em
inv = (r["investor_name"] or "").strip()
if inv:
if inv not in em["investors"]:
em["investors"].append(inv)
g["_inv"].add(inv)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
sent = sum(1 for e in emails if e["direction"] == "sent")
out.append({
"user_id": g["user_id"], "username": g["username"],
"full_name": g["full_name"], "account_email": g["account_email"],
"emails": emails, "investors": sorted(g["_inv"]),
"sent": sent, "received": len(emails) - sent, "total": len(emails),
})
out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower())
return out
def collect_investor_activity(conn, since_iso, until_iso):
"""Re-pivot the same window by investor (across the whole team), deduping each
email so a reply to several team members counts once. Direction is decided at
the EMAIL level: outbound if the sender is one of our mailboxes, else inbound.
Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at,
members[]}], inbound, outbound, total}. Sorted most-active first."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
own = _own_addresses(conn)
groups = {}
for r in rows:
name = (r["investor_name"] or "").strip() or "(unmatched)"
g = groups.get(name)
if g is None:
g = {"name": name, "_emails": {}}
groups[name] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
outbound = (r["from_email"] or "").lower().strip() in own
em = {"email_id": eid, "direction": "out" if outbound else "in",
"subject": r["subject"], "sent_at": r["sent_at"], "members": []}
g["_emails"][eid] = em
# Attribute the sending team member on outbound mail (the sighting with
# is_sent=1); inbound is "from them", so no member shown.
if em["direction"] == "out" and r["is_sent"]:
who = (r["full_name"] or r["username"] or "").strip()
if who and who not in em["members"]:
em["members"].append(who)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
inbound = sum(1 for e in emails if e["direction"] == "in")
out.append({"name": g["name"], "emails": emails,
"inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)})
out.sort(key=lambda x: (-x["total"], x["name"].lower()))
return out
def collect_due_reminders(conn, today_iso):
"""Open reminders due on or before `today_iso` (overdue + due today), soft-delete
filtered. Returns [{title, due_date, investor_name, assignee, overdue}] sorted soonest
first. Empty if the reminders table is absent (feature not migrated on this box)."""
try:
rows = conn.execute(_REMINDERS_SQL, (today_iso,)).fetchall()
except sqlite3.OperationalError:
return []
out = []
for r in rows:
due = str(r["due_date"] or "")[:10]
out.append({
"title": (r["title"] or "").strip(),
"due_date": due,
"investor_name": (r["investor_name"] or "").strip(),
"assignee": (r["assignee"] or "").strip(),
"overdue": bool(due and due < today_iso),
})
return out
# ------------------------------------------------------------------ policy
DIGEST_POLICY_KEY = "digest_policy"
DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18}
def load_digest_policy(conn):
"""Resolve the live digest policy. Precedence: the app_settings DB row (the
admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env
vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns
{enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the
scheduler so both read one source of truth."""
pol = dict(DEFAULT_DIGEST_POLICY)
env_enabled = os.environ.get("CRM_DIGEST_ENABLED")
if env_enabled is not None:
pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on")
env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR")
if env_hour:
try:
pol["send_hour"] = min(23, max(0, int(env_hour)))
except ValueError:
pass
try:
row = conn.execute(
"SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone()
except sqlite3.OperationalError:
row = None
if row:
try:
saved = json.loads(row["value_json"])
except Exception:
saved = None
if isinstance(saved, dict):
if "enabled" in saved:
pol["enabled"] = bool(saved["enabled"])
if "send_hour" in saved:
try:
pol["send_hour"] = min(23, max(0, int(saved["send_hour"])))
except (ValueError, TypeError):
pass
return pol
# ------------------------------------------------------------------ window
# Cap a manual/preview window so an admin can't accidentally fire a build over
# years of history — each active user in the window costs one Spark call. ~3
# months covers any realistic "since last quarter" preview.
MAX_WINDOW_DAYS = 92
def resolve_digest_window(*, hours=None, since=None, now_local=None, now_utc=None):
"""Resolve a digest content window to (since_iso, until_iso) as UTC ISO-8601.
`until` is always now. The start is driven by exactly one of:
- since: a local calendar date 'YYYY-MM-DD' -> that day's local midnight
- hours: a positive integer lookback (the default path; 24 when nothing given)
`since` wins if both are supplied. The span is clamped to MAX_WINDOW_DAYS and
the start must be strictly before now. Raises ValueError on malformed input so
the caller can return a clean 400. Pure (now_* injectable) for testing.
Used by the admin-panel preview and manual-send — neither advances the daily
cursor, so a wide window here never suppresses the scheduled digest."""
nu = (now_utc or datetime.now(timezone.utc)).astimezone(timezone.utc)
nl = now_local or datetime.now().astimezone()
floor = nu - timedelta(days=MAX_WINDOW_DAYS)
if since not in (None, ""):
try:
d = datetime.strptime(str(since).strip()[:10], "%Y-%m-%d")
except ValueError:
raise ValueError("since must be a date in YYYY-MM-DD form")
start = d.replace(tzinfo=nl.tzinfo or timezone.utc).astimezone(timezone.utc)
else:
h = 24 if hours in (None, "") else hours
try:
h = int(h)
except (ValueError, TypeError):
raise ValueError("hours must be an integer")
if h < 1:
raise ValueError("hours must be a positive integer")
start = nu - timedelta(hours=h)
if start >= nu:
raise ValueError("window start must be before now")
if start < floor:
start = floor # clamp to the max span (the response echoes the real window)
fmt = "%Y-%m-%dT%H:%M:%SZ"
return start.strftime(fmt), nu.strftime(fmt)
# ------------------------------------------------------------------ summarization
def _default_chat(prompt, system=None, max_tokens=220):
"""Lazily reach the local Qwen chat via Spark Control (ingest/llm.py)."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest"))
import llm # noqa: E402
return llm.chat(prompt, system=system, max_tokens=max_tokens)
def _user_email_block(group, max_emails=20, body_chars=500):
lines = []
for em in group["emails"][:max_emails]:
invs = ", ".join(em["investors"]) or "(unmatched)"
body = " ".join((em.get("text") or "").split())[:body_chars]
line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}"
if body:
line += f" | {body}"
lines.append(line)
return "\n".join(lines)
def _fallback_narrative(group):
"""Deterministic summary when the local model is unavailable — the digest
must still send (always-send) with real counts rather than fail."""
name = group.get("full_name") or group.get("username") or "Team member"
invs = ", ".join(group["investors"]) or "no matched investors"
return (f"{name} had {group['total']} email(s) "
f"({group['sent']} sent, {group['received']} received) with {invs}. "
"(Local summary unavailable.)")
def summarize_user_day(group, chat_fn=None):
"""One narrative paragraph for a user's day, from the local model. Falls back
to a deterministic count summary on any error or empty reply."""
fn = chat_fn or _default_chat
name = group.get("full_name") or group.get("username") or "The team member"
prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}"
try:
out = fn(prompt, system=_SYSTEM, max_tokens=220)
out = " ".join((out or "").split()).strip()
if out:
return out
except Exception:
pass
return _fallback_narrative(group)
# ------------------------------------------------------------------ composition
def _parse_iso(iso):
if not iso:
return None
s = str(iso).strip()
for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def _fmt_local(iso):
"""UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting
to stay portable (no platform-specific %-I)."""
dt = _parse_iso(iso)
if dt is None:
return str(iso)
dt = dt.astimezone()
hour12 = dt.hour % 12 or 12
ampm = "AM" if dt.hour < 12 else "PM"
return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}"
def _reminders_section(due_reminders):
"""Render the 'reminders due' block (overdue + due today). An empty list renders
nothing, so a clear deck adds no noise to the digest."""
if not due_reminders:
return []
overdue = [r for r in due_reminders if r["overdue"]]
due_today = [r for r in due_reminders if not r["overdue"]]
def _line(r):
inv = f"{r['investor_name']}" if r["investor_name"] else ""
who = f" [{r['assignee']}]" if r["assignee"] else ""
return f"{inv}{r['title']} (due {r['due_date']}){who}"
L = ["", _RULE, f"REMINDERS DUE ({len(due_reminders)})", _RULE]
if overdue:
L += ["", f"Overdue ({len(overdue)}):"] + [_line(r) for r in overdue]
if due_today:
L += ["", f"Due today ({len(due_today)}):"] + [_line(r) for r in due_today]
return L
def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso, due_reminders=None):
title_date = datetime.now().astimezone().strftime("%A, %b %d %Y")
window = f"{_fmt_local(since_iso)} {_fmt_local(until_iso)}"
L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""]
if not user_groups:
L.append("No tracked email activity from any user in this window.")
else:
total_emails = sum(g["total"] for g in user_groups)
total_invs = len({i for g in user_groups for i in g["investors"]})
L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) "
f"· {total_invs} investor(s)")
# ── Section 1: by team member (who did what; per-user Spark narrative) ──
L += ["", _RULE, "BY TEAM MEMBER", _RULE]
for g in user_groups:
invs = ", ".join(g["investors"]) or "(no matched investor)"
L += ["",
f"{g['full_name'] or g['username']} · {g['account_email']}",
f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) "
f"· {invs}", "",
narratives.get(g["user_id"], ""), ""]
for em in g["emails"]:
arrow = "→ Sent" if em["direction"] == "sent" else "← Received"
invs_e = ", ".join(em["investors"]) or "(unmatched)"
subj = em.get("subject") or "(no subject)"
L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})")
# ── Section 2: by investor (team-wide; both directions, structured) ──
L += ["", _RULE, "BY INVESTOR", _RULE]
for inv in investor_groups:
L += ["",
f"{inv['name']} · {inv['total']} email(s) "
f"({inv['inbound']} in, {inv['outbound']} out)"]
for em in inv["emails"]:
subj = em.get("subject") or "(no subject)"
when = _fmt_local(em["sent_at"])
if em["direction"] == "out":
who = ", ".join(em["members"]) or "team"
L.append(f" → Sent by {who} · \"{subj}\" ({when})")
else:
L.append(f" ← Received · \"{subj}\" ({when})")
# ── Reminders due (current state — independent of the activity window) ──
L += _reminders_section(due_reminders or [])
L += ["", _RULE, _FOOTER]
return "\n".join(L)
def build_digest(conn, since_iso, until_iso, chat_fn=None):
"""Build the daily digest for [since_iso, until_iso). Returns
{subject, body, has_activity, user_count, email_count, investor_count,
reminder_count}. Always returns a body (empty windows get a 'no activity' note —
the team chose always-send). Sections: by team member (per-user Spark narrative),
by investor (structured), and reminders due (overdue + due today, current-state)."""
user_groups = collect_user_activity(conn, since_iso, until_iso)
investor_groups = collect_investor_activity(conn, since_iso, until_iso)
narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups}
today_iso = datetime.now().astimezone().strftime("%Y-%m-%d")
due_reminders = collect_due_reminders(conn, today_iso)
body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso, due_reminders)
stamp = datetime.now().astimezone().strftime("%b %d")
return {
"subject": f"Ten31 CRM — Daily Activity Digest · {stamp}",
"body": body,
"has_activity": bool(user_groups),
"user_count": len(user_groups),
"email_count": sum(g["total"] for g in user_groups),
"investor_count": len(investor_groups),
"reminder_count": len(due_reminders),
}