Files
Keysat c7b74a2704 Email search/query + windowed digest preview (v0.1.0:83)
Communications tab (search/query roadmap items 1 & 2):
- Fix the investor dropdown: the facet only listed grid investors, so it
  came back empty whenever email matched a classic contact or org domain
  (no grid id — the common case). It now mirrors the email list, resolving
  each link to a typed identity (fund:/org:/contact:/addr:) with precedence
  grid -> org -> contact -> address; investor_id accepts the typed key
  (bare id = fund: for back-compat) and an unknown prefix matches nothing.
- Add a date-range filter and a click-to-expand full-body view
  (GET /api/email/detail, admin, soft-delete-gated; body_text only, never
  raw remote HTML).
- Add a "Search content" mode: GET /api/email/search wraps the ingest
  hybrid_search over the Qdrant email index (doc_type=email), hydrated and
  soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down.

Daily digest:
- Settings -> Admin builds a digest over a chosen window (last 24h or since
  a date) as an in-app preview before sending (POST /api/admin/digest/preview),
  so the local-Spark summarizer can be verified on demand even on a quiet day.
  Manual send uses the same window; neither advances the daily cursor, so a
  preview never suppresses the scheduled digest.

Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
2026-06-16 20:46:15 -05:00

419 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daily activity digest — content builder (Phase B).
Assembles the per-user -> per-investor email-activity digest and summarizes each
team member's day with ONE narrative paragraph from the LOCAL Spark model
(ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately
un-anonymized (real LP names + email substance), so every summarization stays on
Ten31 infra. Keeping the substance local is the whole point — this is the one
path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary.
This is an internal ops email to the team's own admins, so it is exempt from the
"agents draft, humans send" rule — that rule governs outward LP/prospect contact,
not an internal digest to the fund's own inboxes. Never extend it to send to LPs.
Soft-delete: every read here filters the relevant tombstones —
`email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the
org/contact name joins drop soft-deleted rows (falling back to the raw address).
Stdlib only; the local-LLM client is imported lazily so this module stays
importable (and testable with an injected chat fn) without Spark configured.
"""
import json
import os
import sqlite3
from datetime import datetime, timedelta, timezone
# One row per (account-sighting x investor-link) in the window. Grouped into
# per-user buckets in Python. Investor display name resolves fundraising grid ->
# organization -> contact -> the raw matched address, skipping soft-deleted
# org/contact rows (fundraising_investors has no soft-delete column — it is a
# rebuilt projection of the grid).
_ACTIVITY_SQL = """
SELECT
ea.user_id AS user_id,
u.username AS username,
u.full_name AS full_name,
ea.email_address AS account_email,
eam.is_sent AS is_sent,
e.id AS email_id,
e.from_email AS from_email,
e.subject AS subject,
e.body_text AS body_text,
e.snippet AS snippet,
e.sent_at AS sent_at,
COALESCE(
NULLIF(TRIM(fi.investor_name), ''),
NULLIF(TRIM(o.name), ''),
NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''),
eil.matched_address
) AS investor_name
FROM email_account_messages eam
JOIN email_accounts ea ON ea.id = eam.account_id
JOIN users u ON u.id = ea.user_id
JOIN emails e ON e.id = eam.email_id
JOIN email_investor_links eil ON eil.email_id = e.id
LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id
LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL
LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL
WHERE eam.deleted_at IS NULL
AND u.is_active = 1
AND e.is_matched = 1
AND e.sent_at >= ? AND e.sent_at < ?
ORDER BY u.full_name, u.username, e.sent_at ASC
"""
_RULE = "" * 52
_FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), "
"never Claude. This mailbox is unmonitored.")
_SYSTEM = (
"You write a brief internal activity digest for a venture fund's partners. "
"Given one team member's emails with investors over the last day, write 2-4 "
"sentences summarizing what they did: which investors they engaged and the "
"gist of each thread. Name the investors. Past tense, plain prose, no "
"greeting, no bullet points, no sign-off."
)
# ------------------------------------------------------------------ collection
def _fetch_activity_rows(conn, since_iso, until_iso):
"""Raw (sighting x investor-link) rows for the window. [] if email tables absent."""
try:
return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall()
except sqlite3.OperationalError:
return [] # email tables not present (integration disabled) — nothing to report
def _own_addresses(conn):
"""Lower-cased set of enrolled mailbox addresses — used to decide whether an
email is outbound (from us) or inbound (from the investor) at the email level."""
try:
return {(r[0] or "").lower().strip()
for r in conn.execute("SELECT email_address FROM email_accounts")}
except sqlite3.OperationalError:
return set()
def collect_user_activity(conn, since_iso, until_iso):
"""Return per-user activity buckets for emails in [since_iso, until_iso).
Each bucket: {user_id, username, full_name, account_email, emails[], investors[],
sent, received, total}. Empty list if the email tables are absent. Only users
who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS
user send the message."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
groups = {}
for r in rows:
uid = r["user_id"]
g = groups.get(uid)
if g is None:
g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"],
"account_email": r["account_email"], "_emails": {}, "_inv": set()}
groups[uid] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
em = {"email_id": eid,
"direction": "sent" if r["is_sent"] else "received",
"subject": r["subject"], "sent_at": r["sent_at"],
"text": r["body_text"] or r["snippet"] or "", "investors": []}
g["_emails"][eid] = em
inv = (r["investor_name"] or "").strip()
if inv:
if inv not in em["investors"]:
em["investors"].append(inv)
g["_inv"].add(inv)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
sent = sum(1 for e in emails if e["direction"] == "sent")
out.append({
"user_id": g["user_id"], "username": g["username"],
"full_name": g["full_name"], "account_email": g["account_email"],
"emails": emails, "investors": sorted(g["_inv"]),
"sent": sent, "received": len(emails) - sent, "total": len(emails),
})
out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower())
return out
def collect_investor_activity(conn, since_iso, until_iso):
"""Re-pivot the same window by investor (across the whole team), deduping each
email so a reply to several team members counts once. Direction is decided at
the EMAIL level: outbound if the sender is one of our mailboxes, else inbound.
Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at,
members[]}], inbound, outbound, total}. Sorted most-active first."""
rows = _fetch_activity_rows(conn, since_iso, until_iso)
own = _own_addresses(conn)
groups = {}
for r in rows:
name = (r["investor_name"] or "").strip() or "(unmatched)"
g = groups.get(name)
if g is None:
g = {"name": name, "_emails": {}}
groups[name] = g
eid = r["email_id"]
em = g["_emails"].get(eid)
if em is None:
outbound = (r["from_email"] or "").lower().strip() in own
em = {"email_id": eid, "direction": "out" if outbound else "in",
"subject": r["subject"], "sent_at": r["sent_at"], "members": []}
g["_emails"][eid] = em
# Attribute the sending team member on outbound mail (the sighting with
# is_sent=1); inbound is "from them", so no member shown.
if em["direction"] == "out" and r["is_sent"]:
who = (r["full_name"] or r["username"] or "").strip()
if who and who not in em["members"]:
em["members"].append(who)
out = []
for g in groups.values():
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
inbound = sum(1 for e in emails if e["direction"] == "in")
out.append({"name": g["name"], "emails": emails,
"inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)})
out.sort(key=lambda x: (-x["total"], x["name"].lower()))
return out
# ------------------------------------------------------------------ policy
DIGEST_POLICY_KEY = "digest_policy"
DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18}
def load_digest_policy(conn):
"""Resolve the live digest policy. Precedence: the app_settings DB row (the
admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env
vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns
{enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the
scheduler so both read one source of truth."""
pol = dict(DEFAULT_DIGEST_POLICY)
env_enabled = os.environ.get("CRM_DIGEST_ENABLED")
if env_enabled is not None:
pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on")
env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR")
if env_hour:
try:
pol["send_hour"] = min(23, max(0, int(env_hour)))
except ValueError:
pass
try:
row = conn.execute(
"SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone()
except sqlite3.OperationalError:
row = None
if row:
try:
saved = json.loads(row["value_json"])
except Exception:
saved = None
if isinstance(saved, dict):
if "enabled" in saved:
pol["enabled"] = bool(saved["enabled"])
if "send_hour" in saved:
try:
pol["send_hour"] = min(23, max(0, int(saved["send_hour"])))
except (ValueError, TypeError):
pass
return pol
# ------------------------------------------------------------------ window
# Cap a manual/preview window so an admin can't accidentally fire a build over
# years of history — each active user in the window costs one Spark call. ~3
# months covers any realistic "since last quarter" preview.
MAX_WINDOW_DAYS = 92
def resolve_digest_window(*, hours=None, since=None, now_local=None, now_utc=None):
"""Resolve a digest content window to (since_iso, until_iso) as UTC ISO-8601.
`until` is always now. The start is driven by exactly one of:
- since: a local calendar date 'YYYY-MM-DD' -> that day's local midnight
- hours: a positive integer lookback (the default path; 24 when nothing given)
`since` wins if both are supplied. The span is clamped to MAX_WINDOW_DAYS and
the start must be strictly before now. Raises ValueError on malformed input so
the caller can return a clean 400. Pure (now_* injectable) for testing.
Used by the admin-panel preview and manual-send — neither advances the daily
cursor, so a wide window here never suppresses the scheduled digest."""
nu = (now_utc or datetime.now(timezone.utc)).astimezone(timezone.utc)
nl = now_local or datetime.now().astimezone()
floor = nu - timedelta(days=MAX_WINDOW_DAYS)
if since not in (None, ""):
try:
d = datetime.strptime(str(since).strip()[:10], "%Y-%m-%d")
except ValueError:
raise ValueError("since must be a date in YYYY-MM-DD form")
start = d.replace(tzinfo=nl.tzinfo or timezone.utc).astimezone(timezone.utc)
else:
h = 24 if hours in (None, "") else hours
try:
h = int(h)
except (ValueError, TypeError):
raise ValueError("hours must be an integer")
if h < 1:
raise ValueError("hours must be a positive integer")
start = nu - timedelta(hours=h)
if start >= nu:
raise ValueError("window start must be before now")
if start < floor:
start = floor # clamp to the max span (the response echoes the real window)
fmt = "%Y-%m-%dT%H:%M:%SZ"
return start.strftime(fmt), nu.strftime(fmt)
# ------------------------------------------------------------------ summarization
def _default_chat(prompt, system=None, max_tokens=220):
"""Lazily reach the local Qwen chat via Spark Control (ingest/llm.py)."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest"))
import llm # noqa: E402
return llm.chat(prompt, system=system, max_tokens=max_tokens)
def _user_email_block(group, max_emails=20, body_chars=500):
lines = []
for em in group["emails"][:max_emails]:
invs = ", ".join(em["investors"]) or "(unmatched)"
body = " ".join((em.get("text") or "").split())[:body_chars]
line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}"
if body:
line += f" | {body}"
lines.append(line)
return "\n".join(lines)
def _fallback_narrative(group):
"""Deterministic summary when the local model is unavailable — the digest
must still send (always-send) with real counts rather than fail."""
name = group.get("full_name") or group.get("username") or "Team member"
invs = ", ".join(group["investors"]) or "no matched investors"
return (f"{name} had {group['total']} email(s) "
f"({group['sent']} sent, {group['received']} received) with {invs}. "
"(Local summary unavailable.)")
def summarize_user_day(group, chat_fn=None):
"""One narrative paragraph for a user's day, from the local model. Falls back
to a deterministic count summary on any error or empty reply."""
fn = chat_fn or _default_chat
name = group.get("full_name") or group.get("username") or "The team member"
prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}"
try:
out = fn(prompt, system=_SYSTEM, max_tokens=220)
out = " ".join((out or "").split()).strip()
if out:
return out
except Exception:
pass
return _fallback_narrative(group)
# ------------------------------------------------------------------ composition
def _parse_iso(iso):
if not iso:
return None
s = str(iso).strip()
for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def _fmt_local(iso):
"""UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting
to stay portable (no platform-specific %-I)."""
dt = _parse_iso(iso)
if dt is None:
return str(iso)
dt = dt.astimezone()
hour12 = dt.hour % 12 or 12
ampm = "AM" if dt.hour < 12 else "PM"
return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}"
def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso):
title_date = datetime.now().astimezone().strftime("%A, %b %d %Y")
window = f"{_fmt_local(since_iso)} {_fmt_local(until_iso)}"
L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""]
if not user_groups:
L += ["No tracked email activity from any user in this window.", "", _FOOTER]
return "\n".join(L)
total_emails = sum(g["total"] for g in user_groups)
total_invs = len({i for g in user_groups for i in g["investors"]})
L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) "
f"· {total_invs} investor(s)")
# ── Section 1: by team member (who did what; per-user Spark narrative) ──
L += ["", _RULE, "BY TEAM MEMBER", _RULE]
for g in user_groups:
invs = ", ".join(g["investors"]) or "(no matched investor)"
L += ["",
f"{g['full_name'] or g['username']} · {g['account_email']}",
f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) "
f"· {invs}", "",
narratives.get(g["user_id"], ""), ""]
for em in g["emails"]:
arrow = "→ Sent" if em["direction"] == "sent" else "← Received"
invs_e = ", ".join(em["investors"]) or "(unmatched)"
subj = em.get("subject") or "(no subject)"
L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})")
# ── Section 2: by investor (team-wide; both directions, structured) ──
L += ["", _RULE, "BY INVESTOR", _RULE]
for inv in investor_groups:
L += ["",
f"{inv['name']} · {inv['total']} email(s) "
f"({inv['inbound']} in, {inv['outbound']} out)"]
for em in inv["emails"]:
subj = em.get("subject") or "(no subject)"
when = _fmt_local(em["sent_at"])
if em["direction"] == "out":
who = ", ".join(em["members"]) or "team"
L.append(f" → Sent by {who} · \"{subj}\" ({when})")
else:
L.append(f" ← Received · \"{subj}\" ({when})")
L += ["", _RULE, _FOOTER]
return "\n".join(L)
def build_digest(conn, since_iso, until_iso, chat_fn=None):
"""Build the daily digest for [since_iso, until_iso). Returns
{subject, body, has_activity, user_count, email_count, investor_count}. Always
returns a body (empty windows get a 'no activity' note — the team chose
always-send). Two sections: by team member (per-user Spark narrative) and by
investor (structured, both directions)."""
user_groups = collect_user_activity(conn, since_iso, until_iso)
investor_groups = collect_investor_activity(conn, since_iso, until_iso)
narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups}
body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso)
stamp = datetime.now().astimezone().strftime("%b %d")
return {
"subject": f"Ten31 CRM — Daily Activity Digest · {stamp}",
"body": body,
"has_activity": bool(user_groups),
"user_count": len(user_groups),
"email_count": sum(g["total"] for g in user_groups),
"investor_count": len(investor_groups),
}