"""Daily activity digest — content builder (Phase B). Assembles the per-user -> per-investor email-activity digest and summarizes each team member's day with ONE narrative paragraph from the LOCAL Spark model (ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately un-anonymized (real LP names + email substance), so every summarization stays on Ten31 infra. Keeping the substance local is the whole point — this is the one path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary. This is an internal ops email to the team's own admins, so it is exempt from the "agents draft, humans send" rule — that rule governs outward LP/prospect contact, not an internal digest to the fund's own inboxes. Never extend it to send to LPs. Soft-delete: every read here filters the relevant tombstones — `email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the org/contact name joins drop soft-deleted rows (falling back to the raw address). Stdlib only; the local-LLM client is imported lazily so this module stays importable (and testable with an injected chat fn) without Spark configured. """ import json import os import sqlite3 from datetime import datetime, timezone # One row per (account-sighting x investor-link) in the window. Grouped into # per-user buckets in Python. Investor display name resolves fundraising grid -> # organization -> contact -> the raw matched address, skipping soft-deleted # org/contact rows (fundraising_investors has no soft-delete column — it is a # rebuilt projection of the grid). _ACTIVITY_SQL = """ SELECT ea.user_id AS user_id, u.username AS username, u.full_name AS full_name, ea.email_address AS account_email, eam.is_sent AS is_sent, e.id AS email_id, e.from_email AS from_email, e.subject AS subject, e.body_text AS body_text, e.snippet AS snippet, e.sent_at AS sent_at, COALESCE( NULLIF(TRIM(fi.investor_name), ''), NULLIF(TRIM(o.name), ''), NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''), eil.matched_address ) AS investor_name FROM email_account_messages eam JOIN email_accounts ea ON ea.id = eam.account_id JOIN users u ON u.id = ea.user_id JOIN emails e ON e.id = eam.email_id JOIN email_investor_links eil ON eil.email_id = e.id LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL WHERE eam.deleted_at IS NULL AND u.is_active = 1 AND e.is_matched = 1 AND e.sent_at >= ? AND e.sent_at < ? ORDER BY u.full_name, u.username, e.sent_at ASC """ _RULE = "─" * 52 _FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), " "never Claude. This mailbox is unmonitored.") _SYSTEM = ( "You write a brief internal activity digest for a venture fund's partners. " "Given one team member's emails with investors over the last day, write 2-4 " "sentences summarizing what they did: which investors they engaged and the " "gist of each thread. Name the investors. Past tense, plain prose, no " "greeting, no bullet points, no sign-off." ) # ------------------------------------------------------------------ collection def _fetch_activity_rows(conn, since_iso, until_iso): """Raw (sighting x investor-link) rows for the window. [] if email tables absent.""" try: return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall() except sqlite3.OperationalError: return [] # email tables not present (integration disabled) — nothing to report def _own_addresses(conn): """Lower-cased set of enrolled mailbox addresses — used to decide whether an email is outbound (from us) or inbound (from the investor) at the email level.""" try: return {(r[0] or "").lower().strip() for r in conn.execute("SELECT email_address FROM email_accounts")} except sqlite3.OperationalError: return set() def collect_user_activity(conn, since_iso, until_iso): """Return per-user activity buckets for emails in [since_iso, until_iso). Each bucket: {user_id, username, full_name, account_email, emails[], investors[], sent, received, total}. Empty list if the email tables are absent. Only users who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS user send the message.""" rows = _fetch_activity_rows(conn, since_iso, until_iso) groups = {} for r in rows: uid = r["user_id"] g = groups.get(uid) if g is None: g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"], "account_email": r["account_email"], "_emails": {}, "_inv": set()} groups[uid] = g eid = r["email_id"] em = g["_emails"].get(eid) if em is None: em = {"email_id": eid, "direction": "sent" if r["is_sent"] else "received", "subject": r["subject"], "sent_at": r["sent_at"], "text": r["body_text"] or r["snippet"] or "", "investors": []} g["_emails"][eid] = em inv = (r["investor_name"] or "").strip() if inv: if inv not in em["investors"]: em["investors"].append(inv) g["_inv"].add(inv) out = [] for g in groups.values(): emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "") sent = sum(1 for e in emails if e["direction"] == "sent") out.append({ "user_id": g["user_id"], "username": g["username"], "full_name": g["full_name"], "account_email": g["account_email"], "emails": emails, "investors": sorted(g["_inv"]), "sent": sent, "received": len(emails) - sent, "total": len(emails), }) out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower()) return out def collect_investor_activity(conn, since_iso, until_iso): """Re-pivot the same window by investor (across the whole team), deduping each email so a reply to several team members counts once. Direction is decided at the EMAIL level: outbound if the sender is one of our mailboxes, else inbound. Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at, members[]}], inbound, outbound, total}. Sorted most-active first.""" rows = _fetch_activity_rows(conn, since_iso, until_iso) own = _own_addresses(conn) groups = {} for r in rows: name = (r["investor_name"] or "").strip() or "(unmatched)" g = groups.get(name) if g is None: g = {"name": name, "_emails": {}} groups[name] = g eid = r["email_id"] em = g["_emails"].get(eid) if em is None: outbound = (r["from_email"] or "").lower().strip() in own em = {"email_id": eid, "direction": "out" if outbound else "in", "subject": r["subject"], "sent_at": r["sent_at"], "members": []} g["_emails"][eid] = em # Attribute the sending team member on outbound mail (the sighting with # is_sent=1); inbound is "from them", so no member shown. if em["direction"] == "out" and r["is_sent"]: who = (r["full_name"] or r["username"] or "").strip() if who and who not in em["members"]: em["members"].append(who) out = [] for g in groups.values(): emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "") inbound = sum(1 for e in emails if e["direction"] == "in") out.append({"name": g["name"], "emails": emails, "inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)}) out.sort(key=lambda x: (-x["total"], x["name"].lower())) return out # ------------------------------------------------------------------ policy DIGEST_POLICY_KEY = "digest_policy" DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18} def load_digest_policy(conn): """Resolve the live digest policy. Precedence: the app_settings DB row (the admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns {enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the scheduler so both read one source of truth.""" pol = dict(DEFAULT_DIGEST_POLICY) env_enabled = os.environ.get("CRM_DIGEST_ENABLED") if env_enabled is not None: pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on") env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR") if env_hour: try: pol["send_hour"] = min(23, max(0, int(env_hour))) except ValueError: pass try: row = conn.execute( "SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone() except sqlite3.OperationalError: row = None if row: try: saved = json.loads(row["value_json"]) except Exception: saved = None if isinstance(saved, dict): if "enabled" in saved: pol["enabled"] = bool(saved["enabled"]) if "send_hour" in saved: try: pol["send_hour"] = min(23, max(0, int(saved["send_hour"]))) except (ValueError, TypeError): pass return pol # ------------------------------------------------------------------ summarization def _default_chat(prompt, system=None, max_tokens=220): """Lazily reach the local Qwen chat via Spark Control (ingest/llm.py).""" import os import sys sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest")) import llm # noqa: E402 return llm.chat(prompt, system=system, max_tokens=max_tokens) def _user_email_block(group, max_emails=20, body_chars=500): lines = [] for em in group["emails"][:max_emails]: invs = ", ".join(em["investors"]) or "(unmatched)" body = " ".join((em.get("text") or "").split())[:body_chars] line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}" if body: line += f" | {body}" lines.append(line) return "\n".join(lines) def _fallback_narrative(group): """Deterministic summary when the local model is unavailable — the digest must still send (always-send) with real counts rather than fail.""" name = group.get("full_name") or group.get("username") or "Team member" invs = ", ".join(group["investors"]) or "no matched investors" return (f"{name} had {group['total']} email(s) " f"({group['sent']} sent, {group['received']} received) with {invs}. " "(Local summary unavailable.)") def summarize_user_day(group, chat_fn=None): """One narrative paragraph for a user's day, from the local model. Falls back to a deterministic count summary on any error or empty reply.""" fn = chat_fn or _default_chat name = group.get("full_name") or group.get("username") or "The team member" prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}" try: out = fn(prompt, system=_SYSTEM, max_tokens=220) out = " ".join((out or "").split()).strip() if out: return out except Exception: pass return _fallback_narrative(group) # ------------------------------------------------------------------ composition def _parse_iso(iso): if not iso: return None s = str(iso).strip() for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc) except ValueError: continue return None def _fmt_local(iso): """UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting to stay portable (no platform-specific %-I).""" dt = _parse_iso(iso) if dt is None: return str(iso) dt = dt.astimezone() hour12 = dt.hour % 12 or 12 ampm = "AM" if dt.hour < 12 else "PM" return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}" def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso): title_date = datetime.now().astimezone().strftime("%A, %b %d %Y") window = f"{_fmt_local(since_iso)} – {_fmt_local(until_iso)}" L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""] if not user_groups: L += ["No tracked email activity from any user in this window.", "", _FOOTER] return "\n".join(L) total_emails = sum(g["total"] for g in user_groups) total_invs = len({i for g in user_groups for i in g["investors"]}) L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) " f"· {total_invs} investor(s)") # ── Section 1: by team member (who did what; per-user Spark narrative) ── L += ["", _RULE, "BY TEAM MEMBER", _RULE] for g in user_groups: invs = ", ".join(g["investors"]) or "(no matched investor)" L += ["", f"{g['full_name'] or g['username']} · {g['account_email']}", f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) " f"· {invs}", "", narratives.get(g["user_id"], ""), ""] for em in g["emails"]: arrow = "→ Sent" if em["direction"] == "sent" else "← Received" invs_e = ", ".join(em["investors"]) or "(unmatched)" subj = em.get("subject") or "(no subject)" L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})") # ── Section 2: by investor (team-wide; both directions, structured) ── L += ["", _RULE, "BY INVESTOR", _RULE] for inv in investor_groups: L += ["", f"{inv['name']} · {inv['total']} email(s) " f"({inv['inbound']} in, {inv['outbound']} out)"] for em in inv["emails"]: subj = em.get("subject") or "(no subject)" when = _fmt_local(em["sent_at"]) if em["direction"] == "out": who = ", ".join(em["members"]) or "team" L.append(f" → Sent by {who} · \"{subj}\" ({when})") else: L.append(f" ← Received · \"{subj}\" ({when})") L += ["", _RULE, _FOOTER] return "\n".join(L) def build_digest(conn, since_iso, until_iso, chat_fn=None): """Build the daily digest for [since_iso, until_iso). Returns {subject, body, has_activity, user_count, email_count, investor_count}. Always returns a body (empty windows get a 'no activity' note — the team chose always-send). Two sections: by team member (per-user Spark narrative) and by investor (structured, both directions).""" user_groups = collect_user_activity(conn, since_iso, until_iso) investor_groups = collect_investor_activity(conn, since_iso, until_iso) narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups} body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso) stamp = datetime.now().astimezone().strftime("%b %d") return { "subject": f"Ten31 CRM — Daily Activity Digest · {stamp}", "body": body, "has_activity": bool(user_groups), "user_count": len(user_groups), "email_count": sum(g["total"] for g in user_groups), "investor_count": len(investor_groups), }