c7b74a2704
Communications tab (search/query roadmap items 1 & 2): - Fix the investor dropdown: the facet only listed grid investors, so it came back empty whenever email matched a classic contact or org domain (no grid id — the common case). It now mirrors the email list, resolving each link to a typed identity (fund:/org:/contact:/addr:) with precedence grid -> org -> contact -> address; investor_id accepts the typed key (bare id = fund: for back-compat) and an unknown prefix matches nothing. - Add a date-range filter and a click-to-expand full-body view (GET /api/email/detail, admin, soft-delete-gated; body_text only, never raw remote HTML). - Add a "Search content" mode: GET /api/email/search wraps the ingest hybrid_search over the Qdrant email index (doc_type=email), hydrated and soft-delete-filtered against SQLite (canonical), 503 if Spark/Qdrant down. Daily digest: - Settings -> Admin builds a digest over a chosen window (last 24h or since a date) as an in-app preview before sending (POST /api/admin/digest/preview), so the local-Spark summarizer can be verified on demand even on a quiet day. Manual send uses the same window; neither advances the daily cursor, so a preview never suppresses the scheduled digest. Code-only, migrations no-op. 22/22 backend tests, render-smoke pass.
419 lines
18 KiB
Python
419 lines
18 KiB
Python
"""Daily activity digest — content builder (Phase B).
|
||
|
||
Assembles the per-user -> per-investor email-activity digest and summarizes each
|
||
team member's day with ONE narrative paragraph from the LOCAL Spark model
|
||
(ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately
|
||
un-anonymized (real LP names + email substance), so every summarization stays on
|
||
Ten31 infra. Keeping the substance local is the whole point — this is the one
|
||
path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary.
|
||
|
||
This is an internal ops email to the team's own admins, so it is exempt from the
|
||
"agents draft, humans send" rule — that rule governs outward LP/prospect contact,
|
||
not an internal digest to the fund's own inboxes. Never extend it to send to LPs.
|
||
|
||
Soft-delete: every read here filters the relevant tombstones —
|
||
`email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the
|
||
org/contact name joins drop soft-deleted rows (falling back to the raw address).
|
||
|
||
Stdlib only; the local-LLM client is imported lazily so this module stays
|
||
importable (and testable with an injected chat fn) without Spark configured.
|
||
"""
|
||
import json
|
||
import os
|
||
import sqlite3
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
|
||
# One row per (account-sighting x investor-link) in the window. Grouped into
|
||
# per-user buckets in Python. Investor display name resolves fundraising grid ->
|
||
# organization -> contact -> the raw matched address, skipping soft-deleted
|
||
# org/contact rows (fundraising_investors has no soft-delete column — it is a
|
||
# rebuilt projection of the grid).
|
||
_ACTIVITY_SQL = """
|
||
SELECT
|
||
ea.user_id AS user_id,
|
||
u.username AS username,
|
||
u.full_name AS full_name,
|
||
ea.email_address AS account_email,
|
||
eam.is_sent AS is_sent,
|
||
e.id AS email_id,
|
||
e.from_email AS from_email,
|
||
e.subject AS subject,
|
||
e.body_text AS body_text,
|
||
e.snippet AS snippet,
|
||
e.sent_at AS sent_at,
|
||
COALESCE(
|
||
NULLIF(TRIM(fi.investor_name), ''),
|
||
NULLIF(TRIM(o.name), ''),
|
||
NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''),
|
||
eil.matched_address
|
||
) AS investor_name
|
||
FROM email_account_messages eam
|
||
JOIN email_accounts ea ON ea.id = eam.account_id
|
||
JOIN users u ON u.id = ea.user_id
|
||
JOIN emails e ON e.id = eam.email_id
|
||
JOIN email_investor_links eil ON eil.email_id = e.id
|
||
LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id
|
||
LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL
|
||
LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL
|
||
WHERE eam.deleted_at IS NULL
|
||
AND u.is_active = 1
|
||
AND e.is_matched = 1
|
||
AND e.sent_at >= ? AND e.sent_at < ?
|
||
ORDER BY u.full_name, u.username, e.sent_at ASC
|
||
"""
|
||
|
||
_RULE = "─" * 52
|
||
_FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), "
|
||
"never Claude. This mailbox is unmonitored.")
|
||
|
||
_SYSTEM = (
|
||
"You write a brief internal activity digest for a venture fund's partners. "
|
||
"Given one team member's emails with investors over the last day, write 2-4 "
|
||
"sentences summarizing what they did: which investors they engaged and the "
|
||
"gist of each thread. Name the investors. Past tense, plain prose, no "
|
||
"greeting, no bullet points, no sign-off."
|
||
)
|
||
|
||
|
||
# ------------------------------------------------------------------ collection
|
||
|
||
def _fetch_activity_rows(conn, since_iso, until_iso):
|
||
"""Raw (sighting x investor-link) rows for the window. [] if email tables absent."""
|
||
try:
|
||
return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall()
|
||
except sqlite3.OperationalError:
|
||
return [] # email tables not present (integration disabled) — nothing to report
|
||
|
||
|
||
def _own_addresses(conn):
|
||
"""Lower-cased set of enrolled mailbox addresses — used to decide whether an
|
||
email is outbound (from us) or inbound (from the investor) at the email level."""
|
||
try:
|
||
return {(r[0] or "").lower().strip()
|
||
for r in conn.execute("SELECT email_address FROM email_accounts")}
|
||
except sqlite3.OperationalError:
|
||
return set()
|
||
|
||
|
||
def collect_user_activity(conn, since_iso, until_iso):
|
||
"""Return per-user activity buckets for emails in [since_iso, until_iso).
|
||
|
||
Each bucket: {user_id, username, full_name, account_email, emails[], investors[],
|
||
sent, received, total}. Empty list if the email tables are absent. Only users
|
||
who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS
|
||
user send the message."""
|
||
rows = _fetch_activity_rows(conn, since_iso, until_iso)
|
||
groups = {}
|
||
for r in rows:
|
||
uid = r["user_id"]
|
||
g = groups.get(uid)
|
||
if g is None:
|
||
g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"],
|
||
"account_email": r["account_email"], "_emails": {}, "_inv": set()}
|
||
groups[uid] = g
|
||
eid = r["email_id"]
|
||
em = g["_emails"].get(eid)
|
||
if em is None:
|
||
em = {"email_id": eid,
|
||
"direction": "sent" if r["is_sent"] else "received",
|
||
"subject": r["subject"], "sent_at": r["sent_at"],
|
||
"text": r["body_text"] or r["snippet"] or "", "investors": []}
|
||
g["_emails"][eid] = em
|
||
inv = (r["investor_name"] or "").strip()
|
||
if inv:
|
||
if inv not in em["investors"]:
|
||
em["investors"].append(inv)
|
||
g["_inv"].add(inv)
|
||
|
||
out = []
|
||
for g in groups.values():
|
||
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
|
||
sent = sum(1 for e in emails if e["direction"] == "sent")
|
||
out.append({
|
||
"user_id": g["user_id"], "username": g["username"],
|
||
"full_name": g["full_name"], "account_email": g["account_email"],
|
||
"emails": emails, "investors": sorted(g["_inv"]),
|
||
"sent": sent, "received": len(emails) - sent, "total": len(emails),
|
||
})
|
||
out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower())
|
||
return out
|
||
|
||
|
||
def collect_investor_activity(conn, since_iso, until_iso):
|
||
"""Re-pivot the same window by investor (across the whole team), deduping each
|
||
email so a reply to several team members counts once. Direction is decided at
|
||
the EMAIL level: outbound if the sender is one of our mailboxes, else inbound.
|
||
|
||
Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at,
|
||
members[]}], inbound, outbound, total}. Sorted most-active first."""
|
||
rows = _fetch_activity_rows(conn, since_iso, until_iso)
|
||
own = _own_addresses(conn)
|
||
groups = {}
|
||
for r in rows:
|
||
name = (r["investor_name"] or "").strip() or "(unmatched)"
|
||
g = groups.get(name)
|
||
if g is None:
|
||
g = {"name": name, "_emails": {}}
|
||
groups[name] = g
|
||
eid = r["email_id"]
|
||
em = g["_emails"].get(eid)
|
||
if em is None:
|
||
outbound = (r["from_email"] or "").lower().strip() in own
|
||
em = {"email_id": eid, "direction": "out" if outbound else "in",
|
||
"subject": r["subject"], "sent_at": r["sent_at"], "members": []}
|
||
g["_emails"][eid] = em
|
||
# Attribute the sending team member on outbound mail (the sighting with
|
||
# is_sent=1); inbound is "from them", so no member shown.
|
||
if em["direction"] == "out" and r["is_sent"]:
|
||
who = (r["full_name"] or r["username"] or "").strip()
|
||
if who and who not in em["members"]:
|
||
em["members"].append(who)
|
||
|
||
out = []
|
||
for g in groups.values():
|
||
emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "")
|
||
inbound = sum(1 for e in emails if e["direction"] == "in")
|
||
out.append({"name": g["name"], "emails": emails,
|
||
"inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)})
|
||
out.sort(key=lambda x: (-x["total"], x["name"].lower()))
|
||
return out
|
||
|
||
|
||
# ------------------------------------------------------------------ policy
|
||
|
||
DIGEST_POLICY_KEY = "digest_policy"
|
||
DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18}
|
||
|
||
|
||
def load_digest_policy(conn):
|
||
"""Resolve the live digest policy. Precedence: the app_settings DB row (the
|
||
admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env
|
||
vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns
|
||
{enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the
|
||
scheduler so both read one source of truth."""
|
||
pol = dict(DEFAULT_DIGEST_POLICY)
|
||
|
||
env_enabled = os.environ.get("CRM_DIGEST_ENABLED")
|
||
if env_enabled is not None:
|
||
pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on")
|
||
env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR")
|
||
if env_hour:
|
||
try:
|
||
pol["send_hour"] = min(23, max(0, int(env_hour)))
|
||
except ValueError:
|
||
pass
|
||
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone()
|
||
except sqlite3.OperationalError:
|
||
row = None
|
||
if row:
|
||
try:
|
||
saved = json.loads(row["value_json"])
|
||
except Exception:
|
||
saved = None
|
||
if isinstance(saved, dict):
|
||
if "enabled" in saved:
|
||
pol["enabled"] = bool(saved["enabled"])
|
||
if "send_hour" in saved:
|
||
try:
|
||
pol["send_hour"] = min(23, max(0, int(saved["send_hour"])))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return pol
|
||
|
||
|
||
# ------------------------------------------------------------------ window
|
||
|
||
# Cap a manual/preview window so an admin can't accidentally fire a build over
|
||
# years of history — each active user in the window costs one Spark call. ~3
|
||
# months covers any realistic "since last quarter" preview.
|
||
MAX_WINDOW_DAYS = 92
|
||
|
||
|
||
def resolve_digest_window(*, hours=None, since=None, now_local=None, now_utc=None):
|
||
"""Resolve a digest content window to (since_iso, until_iso) as UTC ISO-8601.
|
||
|
||
`until` is always now. The start is driven by exactly one of:
|
||
- since: a local calendar date 'YYYY-MM-DD' -> that day's local midnight
|
||
- hours: a positive integer lookback (the default path; 24 when nothing given)
|
||
`since` wins if both are supplied. The span is clamped to MAX_WINDOW_DAYS and
|
||
the start must be strictly before now. Raises ValueError on malformed input so
|
||
the caller can return a clean 400. Pure (now_* injectable) for testing.
|
||
|
||
Used by the admin-panel preview and manual-send — neither advances the daily
|
||
cursor, so a wide window here never suppresses the scheduled digest."""
|
||
nu = (now_utc or datetime.now(timezone.utc)).astimezone(timezone.utc)
|
||
nl = now_local or datetime.now().astimezone()
|
||
floor = nu - timedelta(days=MAX_WINDOW_DAYS)
|
||
|
||
if since not in (None, ""):
|
||
try:
|
||
d = datetime.strptime(str(since).strip()[:10], "%Y-%m-%d")
|
||
except ValueError:
|
||
raise ValueError("since must be a date in YYYY-MM-DD form")
|
||
start = d.replace(tzinfo=nl.tzinfo or timezone.utc).astimezone(timezone.utc)
|
||
else:
|
||
h = 24 if hours in (None, "") else hours
|
||
try:
|
||
h = int(h)
|
||
except (ValueError, TypeError):
|
||
raise ValueError("hours must be an integer")
|
||
if h < 1:
|
||
raise ValueError("hours must be a positive integer")
|
||
start = nu - timedelta(hours=h)
|
||
|
||
if start >= nu:
|
||
raise ValueError("window start must be before now")
|
||
if start < floor:
|
||
start = floor # clamp to the max span (the response echoes the real window)
|
||
|
||
fmt = "%Y-%m-%dT%H:%M:%SZ"
|
||
return start.strftime(fmt), nu.strftime(fmt)
|
||
|
||
|
||
# ------------------------------------------------------------------ summarization
|
||
|
||
def _default_chat(prompt, system=None, max_tokens=220):
|
||
"""Lazily reach the local Qwen chat via Spark Control (ingest/llm.py)."""
|
||
import os
|
||
import sys
|
||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest"))
|
||
import llm # noqa: E402
|
||
return llm.chat(prompt, system=system, max_tokens=max_tokens)
|
||
|
||
|
||
def _user_email_block(group, max_emails=20, body_chars=500):
|
||
lines = []
|
||
for em in group["emails"][:max_emails]:
|
||
invs = ", ".join(em["investors"]) or "(unmatched)"
|
||
body = " ".join((em.get("text") or "").split())[:body_chars]
|
||
line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}"
|
||
if body:
|
||
line += f" | {body}"
|
||
lines.append(line)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _fallback_narrative(group):
|
||
"""Deterministic summary when the local model is unavailable — the digest
|
||
must still send (always-send) with real counts rather than fail."""
|
||
name = group.get("full_name") or group.get("username") or "Team member"
|
||
invs = ", ".join(group["investors"]) or "no matched investors"
|
||
return (f"{name} had {group['total']} email(s) "
|
||
f"({group['sent']} sent, {group['received']} received) with {invs}. "
|
||
"(Local summary unavailable.)")
|
||
|
||
|
||
def summarize_user_day(group, chat_fn=None):
|
||
"""One narrative paragraph for a user's day, from the local model. Falls back
|
||
to a deterministic count summary on any error or empty reply."""
|
||
fn = chat_fn or _default_chat
|
||
name = group.get("full_name") or group.get("username") or "The team member"
|
||
prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}"
|
||
try:
|
||
out = fn(prompt, system=_SYSTEM, max_tokens=220)
|
||
out = " ".join((out or "").split()).strip()
|
||
if out:
|
||
return out
|
||
except Exception:
|
||
pass
|
||
return _fallback_narrative(group)
|
||
|
||
|
||
# ------------------------------------------------------------------ composition
|
||
|
||
def _parse_iso(iso):
|
||
if not iso:
|
||
return None
|
||
s = str(iso).strip()
|
||
for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"):
|
||
try:
|
||
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
|
||
def _fmt_local(iso):
|
||
"""UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting
|
||
to stay portable (no platform-specific %-I)."""
|
||
dt = _parse_iso(iso)
|
||
if dt is None:
|
||
return str(iso)
|
||
dt = dt.astimezone()
|
||
hour12 = dt.hour % 12 or 12
|
||
ampm = "AM" if dt.hour < 12 else "PM"
|
||
return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}"
|
||
|
||
|
||
def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso):
|
||
title_date = datetime.now().astimezone().strftime("%A, %b %d %Y")
|
||
window = f"{_fmt_local(since_iso)} – {_fmt_local(until_iso)}"
|
||
L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""]
|
||
|
||
if not user_groups:
|
||
L += ["No tracked email activity from any user in this window.", "", _FOOTER]
|
||
return "\n".join(L)
|
||
|
||
total_emails = sum(g["total"] for g in user_groups)
|
||
total_invs = len({i for g in user_groups for i in g["investors"]})
|
||
L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) "
|
||
f"· {total_invs} investor(s)")
|
||
|
||
# ── Section 1: by team member (who did what; per-user Spark narrative) ──
|
||
L += ["", _RULE, "BY TEAM MEMBER", _RULE]
|
||
for g in user_groups:
|
||
invs = ", ".join(g["investors"]) or "(no matched investor)"
|
||
L += ["",
|
||
f"{g['full_name'] or g['username']} · {g['account_email']}",
|
||
f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) "
|
||
f"· {invs}", "",
|
||
narratives.get(g["user_id"], ""), ""]
|
||
for em in g["emails"]:
|
||
arrow = "→ Sent" if em["direction"] == "sent" else "← Received"
|
||
invs_e = ", ".join(em["investors"]) or "(unmatched)"
|
||
subj = em.get("subject") or "(no subject)"
|
||
L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})")
|
||
|
||
# ── Section 2: by investor (team-wide; both directions, structured) ──
|
||
L += ["", _RULE, "BY INVESTOR", _RULE]
|
||
for inv in investor_groups:
|
||
L += ["",
|
||
f"{inv['name']} · {inv['total']} email(s) "
|
||
f"({inv['inbound']} in, {inv['outbound']} out)"]
|
||
for em in inv["emails"]:
|
||
subj = em.get("subject") or "(no subject)"
|
||
when = _fmt_local(em["sent_at"])
|
||
if em["direction"] == "out":
|
||
who = ", ".join(em["members"]) or "team"
|
||
L.append(f" → Sent by {who} · \"{subj}\" ({when})")
|
||
else:
|
||
L.append(f" ← Received · \"{subj}\" ({when})")
|
||
|
||
L += ["", _RULE, _FOOTER]
|
||
return "\n".join(L)
|
||
|
||
|
||
def build_digest(conn, since_iso, until_iso, chat_fn=None):
|
||
"""Build the daily digest for [since_iso, until_iso). Returns
|
||
{subject, body, has_activity, user_count, email_count, investor_count}. Always
|
||
returns a body (empty windows get a 'no activity' note — the team chose
|
||
always-send). Two sections: by team member (per-user Spark narrative) and by
|
||
investor (structured, both directions)."""
|
||
user_groups = collect_user_activity(conn, since_iso, until_iso)
|
||
investor_groups = collect_investor_activity(conn, since_iso, until_iso)
|
||
narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups}
|
||
body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso)
|
||
stamp = datetime.now().astimezone().strftime("%b %d")
|
||
return {
|
||
"subject": f"Ten31 CRM — Daily Activity Digest · {stamp}",
|
||
"body": body,
|
||
"has_activity": bool(user_groups),
|
||
"user_count": len(user_groups),
|
||
"email_count": sum(g["total"] for g in user_groups),
|
||
"investor_count": len(investor_groups),
|
||
}
|