diff --git a/.env.example b/.env.example index d366543..28b4684 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,16 @@ X_API_KEY= CRM_DB_PATH=./data/crm.db CRM_DEV_DB_PATH=./data/crm_dev.db +# ── Daily activity digest (Phase B) ── +# The daily digest (each team member's activity per investor + a by-investor view, +# summarized LOCALLY on Spark — never Claude) is controlled from Settings → Admin +# (stored in the DB). These env vars only SEED the first-boot default before an +# admin sets it; once the policy row exists, the admin panel wins. The "Send Digest +# Now" button works regardless. Leave blank to default to off / 6 PM. +CRM_DIGEST_ENABLED= +# Local (box-time) hour 0-23. Default 18 (6 PM). +CRM_DIGEST_SEND_HOUR=18 + # ── Daily-digest sender ── # The digest mailer prefers Gmail domain-wide delegation (the service account that # already powers email capture; its grant includes gmail.compose, which can send) and diff --git a/AGENTS.md b/AGENTS.md index 5be816b..f90f84b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -68,7 +68,7 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude - **Two coexisting investor models** (classic `contacts`/`lp_profiles` + the `fundraising_*` grid). Reconciling them to canonical IDs is the core entity-resolution task — see `docs/crm-overview.md`. - **Soft-delete only:** `deleted_at` and/or `status='retired'`; never hard-delete. Every READ path must filter `deleted_at IS NULL` — list handlers, get-by-id, nested related-data sub-selects, **and aggregate sub-selects (`COUNT`/`SUM`/`MAX`)**. Audits found leaks in all of these (2026-06-12 detail + nested; 2026-06-13 list-view `contact_count`/`total_funded`/`comm_count`); the **reports** subsystem aggregates still leak (see Current state). Regression-guarded by `backend/test_soft_delete_reads.py`. (Thesis has a subtlety here — see the thesis guide.) -- **Env:** secrets in `.env` (gitignored); names in `.env.example`. Verified names: `ANTHROPIC_API_KEY`, `SPARK_CONTROL_URL`, `SPARK_CONTROL_VERIFY_TLS`, `QDRANT_URL`, `X_API_KEY`, `CRM_DB_PATH`, `CRM_DEV_DB_PATH`. Also used: `CRM_SECRET_KEY` (beta/prod), `CRM_HOST`/`CRM_PORT`, `CRM_DATA_DIR`; digest mailer: `CRM_DIGEST_SENDER` (DWD impersonation sender) + `SMTP_HOST`/`SMTP_PORT`/`SMTP_SECURITY`/`SMTP_FROM`/`SMTP_USERNAME`/`SMTP_PASSWORD` (SMTP fallback). +- **Env:** secrets in `.env` (gitignored); names in `.env.example`. Verified names: `ANTHROPIC_API_KEY`, `SPARK_CONTROL_URL`, `SPARK_CONTROL_VERIFY_TLS`, `QDRANT_URL`, `X_API_KEY`, `CRM_DB_PATH`, `CRM_DEV_DB_PATH`. Also used: `CRM_SECRET_KEY` (beta/prod), `CRM_HOST`/`CRM_PORT`, `CRM_DATA_DIR`; digest mailer: `CRM_DIGEST_SENDER` (DWD impersonation sender) + `SMTP_HOST`/`SMTP_PORT`/`SMTP_SECURITY`/`SMTP_FROM`/`SMTP_USERNAME`/`SMTP_PASSWORD` (SMTP fallback); daily digest (Phase B): `CRM_DIGEST_ENABLED` (opt-in auto-send) + `CRM_DIGEST_SEND_HOUR` (local hour, default 18). - **Commit style:** imperative subject, concise body explaining the *why*; put the package version in the subject (`… (v0.1.0:NN)`) for shippable changes. **No AI co-author / attribution trailers** — commits are authored by the user. ## Always @@ -100,13 +100,13 @@ Subsystem rules live in `docs/guides/` and lazy-load in Claude Code via `.claude ## Current state -_Phase 0 substrate + Phase 1 thesis/outreach are built; **box and repo at v0.1.0:76** (Gmail-DWD digest send; deployed & verified live 2026-06-16). Repo carries minor post-76 review polish (committed, rides the next build). Longer-term backlog: `ROADMAP.md`._ +_Phase 0 substrate + Phase 1 thesis/outreach are built; **box last verified live at v0.1.0:76**; **repo at v0.1.0:77** (digest **Phase B** — daily activity-digest builder/scheduler + by-team-member & by-investor sections + admin-panel control + on-demand send), built and being installed to the box. Longer-term backlog: `ROADMAP.md`._ - **Working (all draft-only):** CRM + ingest (chunk→embed→Qdrant + retrieval) + redaction boundary; Gmail capture (DWD) + email-activity propose→approve; Thesis Workshop + Architect (Claude) with dual-approval gate; Outreach Draft Assistant + follow-up radar + per-user voice + Tier-B in-thread Gmail draft creation. -- **Deployed & verified live: v0.1.0:76** (box `$START9_BOX_HOST`/immense-voyage.local; `installed-version`→`0.1.0:76`, healthy on `:8080`). **Daily-digest send is live:** `backend/digest_mailer.py` routes **Gmail-DWD (primary) → SMTP (fallback)**. DWD path (`backend/email_integration/gmail_send.py`, `gmail.compose`→`users.messages.send`, sender=`CRM_DIGEST_SENDER` else first admin) needs **no app password** and is proven by a live send to grant; the box's DWD grant has `gmail.compose` but not the narrow `gmail.send`. v75 added the SMTP fallback (**Configure Digest SMTP** action → `/data/secrets/smtp/*`, entrypoint exports `SMTP_*`), a Settings→Admin **Send Test Digest Email** button (admin-only, recipients restricted to the admin set), and the list-view soft-delete aggregate fix. Subsystem detail: `docs/guides/email.md`. +- **Deployed & verified live: v0.1.0:76** (box `$START9_BOX_HOST`/immense-voyage.local; `installed-version`→`0.1.0:76`, healthy on `:8080`). **Daily-digest send is live:** `backend/digest_mailer.py` routes **Gmail-DWD (primary) → SMTP (fallback)**. DWD path (`backend/email_integration/gmail_send.py`, `gmail.compose`→`users.messages.send`, sender=`CRM_DIGEST_SENDER` else first admin) needs **no app password** and is proven by a live send to grant; the box's DWD grant has `gmail.compose` but not the narrow `gmail.send`. v75 added the SMTP fallback (**Configure Digest SMTP** action → `/data/secrets/smtp/*`, entrypoint exports `SMTP_*`), a Settings→Admin **Send Test Digest Email** button (admin-only, recipients restricted to the admin set), and the list-view soft-delete aggregate fix. **Digest Phase B is built in-repo (not yet on the box):** `backend/digest_builder.py` builds two sections — *by team member* (per-user **Spark** narrative, never Claude) and *by investor* (team-wide, inbound + outbound, deduped) — soft-delete filtered. `backend/email_integration/digest_scheduler.py` is an always-on daily thread reading a **DB-backed policy** (`app_settings.digest_policy`) each cycle. Enable/send-time live in the **admin panel** (`GET/PATCH /api/admin/digest/policy` + a Settings toggle + time dropdown; env `CRM_DIGEST_ENABLED`/`SEND_HOUR` only seed the first-boot default). Plus `POST /api/admin/digest/send-now` + a Settings **Send Digest Now** button. Subsystem detail: `docs/guides/email.md`. - **Live since v74 (2026-06-13):** login works; `/assets/` traversal 404s (plain + URL-encoded), root health 200. On boot, `ensure_thesis_v2_promoted` makes the v2.0 reserve-asset spine the working *approved* spine (node-level, reversible). Security/privacy hardening (path-traversal close, outreach NER backstop, get-by-id soft-delete) shipped in v74 — detail in `EVALUATION.md`. -- **Tests (2026-06-16):** **19/19 backend tests green** via `python3 backend/run_tests.py` (+`test_smtp_send.py`/`test_smtp_endpoint.py`/`test_gmail_send.py` this session). `py_compile` clean; the s9pk TypeScript typechecks (`cd start9/0.4 && npm run check`, deps installed); `docker_entrypoint.sh` passes `sh -n`. The 2 stale thesis tests stay fixed (seed structure in `docs/guides/thesis.md`). +- **Tests (2026-06-16):** **20/20 backend tests green** via `python3 backend/run_tests.py` (+`test_digest_builder.py` this session — per-user + per-investor queries, soft-delete, inbound dedup, two-section compose, fallback, DB policy resolver, scheduler guards). `py_compile` clean. The 2 stale thesis tests stay fixed (seed structure in `docs/guides/thesis.md`). - **Decided, not yet built:** CRM as canonical thesis backbone with the signal-engine reading from it (reconciliation unwired); reply-all for Tier-B drafts (drafts currently reply to the LP only). - **Known debt (P2, not deploy-blocking):** the **reports subsystem** (`handle_dashboard_report`/`handle_pipeline_report`/`handle_lp_breakdown_report`, ~16 aggregate queries over contacts/opportunities/communications/lp_profiles) still counts soft-deleted rows — the list/detail aggregates were fixed (v74 + the org/contacts list-view follow-up) but the reports were not; needs its own pass + report-endpoint tests; `?limit=abc` crashes the request thread (authenticated list path); scrub-gateway TLS verify off; `cryptography==42.0.5`; unpkg/no-SRI frontend; stale user-visible `start9/0.4/assets/ABOUT.md`; hardcoded Spark/Qdrant IPs in the s9pk; the 5.4k-line `server.py` monolith. P3 batch + full list in `EVALUATION.md`. - **Other gaps:** the v2.0 spine is the *working* spine but **not a canonical `thesis_version`** (needs Grant + Jonathan dual sign-off); Appendix-A conviction/exposure (incl. ~40% Strike) stay Grant's working read, not canonical, not fed to the engine; live features (Claude/Qdrant/Gmail) unverified on the box. -- **Next:** 1) Grant clicks Settings→Admin **Send Test Digest Email** (the app-path confirmation; raw DWD send already proven); 2) **digest Phase B** — daily scheduler + per-user→per-investor activity query (`deleted_at IS NULL`) + **Spark-narrative** summary (never Claude) → email all admins (decisions locked in `ROADMAP.md`); 3) **reports-subsystem soft-delete sweep** (~16 aggregates still leak; fix + tests); 4) `?limit=abc` crash (P2); 5) Grant + Jonathan freeze v2.0 canonical; 6) build reply-all; 7) confirm Appendix-A + Maple/OpenSecret/Primal, then promote. +- **Next:** 1) **bump version + build/deploy** so Phase B reaches the box, then Grant: Settings→Admin **Send Digest Now** to validate the real digest, and tick **Send automatically every day** (arming is now in-app — no env/StartOS change needed); 2) **reports-subsystem soft-delete sweep** (~16 aggregates still leak; fix + tests); 3) `?limit=abc` crash (P2); 4) Grant + Jonathan freeze v2.0 canonical; 5) build reply-all; 6) confirm Appendix-A + Maple/OpenSecret/Primal, then promote. diff --git a/ROADMAP.md b/ROADMAP.md index 1418f57..2cc4910 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -87,7 +87,7 @@ ## Backlog (post-Phase-1 agentic) ### Daily activity digest (email to the team) -*Requested 2026-06-15. **Phase A built + deployed** (v0.1.0:75 live on the box; send path then moved to **Gmail DWD** in v0.1.0:76, redeploy pending). Phase B (digest content + Spark summarization + daily scheduler) remains.* +*Requested 2026-06-15. **Phase A deployed** (v0.1.0:76, live on the box). **Phase B built 2026-06-15** (digest content + Spark summarization + daily scheduler + on-demand send) — in the repo, awaiting the next build/deploy.* **Decisions (locked 2026-06-15):** recipients = **all active admins**; summarization = **Spark-LLM narrative** (never Claude — un-anonymized substance stays local); granularity = **grouped by user** (→ per investor). @@ -95,7 +95,7 @@ **Phase A — DONE:** (v0.1.0:75) `configureDigestSmtp` Start9 action + `docker_entrypoint.sh` `SMTP_*` export + `backend/smtp_send.py` + admin `POST /api/admin/digest/test-email` (recipient-restricted to the admin set — not an open relay) + Settings button. (v0.1.0:76, redeploy pending) `backend/email_integration/gmail_send.py` (`users.messages.send` via DWD/compose) + `backend/digest_mailer.py` (**Gmail-DWD preferred, SMTP fallback**); the endpoint + button route through it; sender = `CRM_DIGEST_SENDER` else first active admin. Tests: `test_smtp_send.py`, `test_smtp_endpoint.py`, `test_gmail_send.py`. -**Phase B — TODO:** daily scheduler (co-locate with `email_integration/scheduler.py`); per-user→per-investor activity query (`deleted_at IS NULL` throughout); Spark-narrative summary of captured email substance; compose + send to all admins. +**Phase B — DONE (2026-06-15/16):** `backend/digest_builder.py` builds **two sections** — *by team member* (per-user **Spark** narrative + both directions, with a deterministic fallback) and *by investor* (team-wide, inbound + outbound, deduped per email, structured). Soft-delete filtered throughout. `backend/email_integration/digest_scheduler.py` is an always-on daily thread that re-reads a **DB-backed policy** each cycle and sends once/day at the configured hour to all active admins (window cursor in `app_settings`). Control moved out of env into the **admin panel**: `app_settings.digest_policy` + `GET/PATCH /api/admin/digest/policy` + a Settings → Admin **enable toggle + send-time dropdown** (env vars only seed the first-boot default). Plus admin `POST /api/admin/digest/send-now` + a "Send Digest Now" button. Decisions settled: **6 PM default**, **always-send** (empty-day note), **per-user narrative + by-investor section**, **in-app control** (not StartOS). Tests: `backend/test_digest_builder.py`. Detail: `docs/guides/email.md`. Have the CRM send a **daily digest email** summarizing each registered user's activity — primarily **who emailed which investors and the substance of those emails** — to the fund principal (and eventually other admins). Scales with the synced-user count: 2 users synced today, ~5 eventually. @@ -106,7 +106,7 @@ Have the CRM send a **daily digest email** summarizing each registered user's ac - **Scheduling:** a daily cron, naturally co-located with the existing `backend/email_integration/scheduler.py` sync cadence. - **Soft-delete:** every aggregate/read in the digest must filter `deleted_at IS NULL` (see the standing soft-delete rule). -Open design questions (Phase B detail, still to settle): fixed daily send time; "nothing happened today" suppression; whether the Spark summary is per-investor-thread or a single per-user narrative. +Open design questions (settled at build time): send time = **6 PM box-local** (configurable in the admin panel), covering the ~24h window up to send; empty days = **always send** with a "no activity" note; summary granularity = **one per-user narrative** plus a **by-investor structured section** (inbound + outbound, team-wide) added 2026-06-16; enable/time live in the **admin panel** (DB-backed), not StartOS actions. ## Definition of done for "Airtable substitute" v1 - Team can manage all investors in one master table diff --git a/backend/digest_builder.py b/backend/digest_builder.py new file mode 100644 index 0000000..e45f851 --- /dev/null +++ b/backend/digest_builder.py @@ -0,0 +1,369 @@ +"""Daily activity digest — content builder (Phase B). + +Assembles the per-user -> per-investor email-activity digest and summarizes each +team member's day with ONE narrative paragraph from the LOCAL Spark model +(ingest/llm.py via Spark Control). NEVER Claude: the digest is deliberately +un-anonymized (real LP names + email substance), so every summarization stays on +Ten31 infra. Keeping the substance local is the whole point — this is the one +path that intentionally bypasses the scrub -> Claude -> re-hydrate boundary. + +This is an internal ops email to the team's own admins, so it is exempt from the +"agents draft, humans send" rule — that rule governs outward LP/prospect contact, +not an internal digest to the fund's own inboxes. Never extend it to send to LPs. + +Soft-delete: every read here filters the relevant tombstones — +`email_account_messages.deleted_at IS NULL`, `users.is_active = 1`, and the +org/contact name joins drop soft-deleted rows (falling back to the raw address). + +Stdlib only; the local-LLM client is imported lazily so this module stays +importable (and testable with an injected chat fn) without Spark configured. +""" +import json +import os +import sqlite3 +from datetime import datetime, timezone + + +# One row per (account-sighting x investor-link) in the window. Grouped into +# per-user buckets in Python. Investor display name resolves fundraising grid -> +# organization -> contact -> the raw matched address, skipping soft-deleted +# org/contact rows (fundraising_investors has no soft-delete column — it is a +# rebuilt projection of the grid). +_ACTIVITY_SQL = """ +SELECT + ea.user_id AS user_id, + u.username AS username, + u.full_name AS full_name, + ea.email_address AS account_email, + eam.is_sent AS is_sent, + e.id AS email_id, + e.from_email AS from_email, + e.subject AS subject, + e.body_text AS body_text, + e.snippet AS snippet, + e.sent_at AS sent_at, + COALESCE( + NULLIF(TRIM(fi.investor_name), ''), + NULLIF(TRIM(o.name), ''), + NULLIF(TRIM(COALESCE(c.first_name, '') || ' ' || COALESCE(c.last_name, '')), ''), + eil.matched_address + ) AS investor_name +FROM email_account_messages eam +JOIN email_accounts ea ON ea.id = eam.account_id +JOIN users u ON u.id = ea.user_id +JOIN emails e ON e.id = eam.email_id +JOIN email_investor_links eil ON eil.email_id = e.id +LEFT JOIN fundraising_investors fi ON fi.id = eil.fundraising_investor_id +LEFT JOIN organizations o ON o.id = eil.organization_id AND o.deleted_at IS NULL +LEFT JOIN contacts c ON c.id = eil.contact_id AND c.deleted_at IS NULL +WHERE eam.deleted_at IS NULL + AND u.is_active = 1 + AND e.is_matched = 1 + AND e.sent_at >= ? AND e.sent_at < ? +ORDER BY u.full_name, u.username, e.sent_at ASC +""" + +_RULE = "─" * 52 +_FOOTER = ("— Internal Ten31 CRM digest. Narratives are generated locally (Spark), " + "never Claude. This mailbox is unmonitored.") + +_SYSTEM = ( + "You write a brief internal activity digest for a venture fund's partners. " + "Given one team member's emails with investors over the last day, write 2-4 " + "sentences summarizing what they did: which investors they engaged and the " + "gist of each thread. Name the investors. Past tense, plain prose, no " + "greeting, no bullet points, no sign-off." +) + + +# ------------------------------------------------------------------ collection + +def _fetch_activity_rows(conn, since_iso, until_iso): + """Raw (sighting x investor-link) rows for the window. [] if email tables absent.""" + try: + return conn.execute(_ACTIVITY_SQL, (since_iso, until_iso)).fetchall() + except sqlite3.OperationalError: + return [] # email tables not present (integration disabled) — nothing to report + + +def _own_addresses(conn): + """Lower-cased set of enrolled mailbox addresses — used to decide whether an + email is outbound (from us) or inbound (from the investor) at the email level.""" + try: + return {(r[0] or "").lower().strip() + for r in conn.execute("SELECT email_address FROM email_accounts")} + except sqlite3.OperationalError: + return set() + + +def collect_user_activity(conn, since_iso, until_iso): + """Return per-user activity buckets for emails in [since_iso, until_iso). + + Each bucket: {user_id, username, full_name, account_email, emails[], investors[], + sent, received, total}. Empty list if the email tables are absent. Only users + who had activity appear. Direction here is per-mailbox (eam.is_sent): did THIS + user send the message.""" + rows = _fetch_activity_rows(conn, since_iso, until_iso) + groups = {} + for r in rows: + uid = r["user_id"] + g = groups.get(uid) + if g is None: + g = {"user_id": uid, "username": r["username"], "full_name": r["full_name"], + "account_email": r["account_email"], "_emails": {}, "_inv": set()} + groups[uid] = g + eid = r["email_id"] + em = g["_emails"].get(eid) + if em is None: + em = {"email_id": eid, + "direction": "sent" if r["is_sent"] else "received", + "subject": r["subject"], "sent_at": r["sent_at"], + "text": r["body_text"] or r["snippet"] or "", "investors": []} + g["_emails"][eid] = em + inv = (r["investor_name"] or "").strip() + if inv: + if inv not in em["investors"]: + em["investors"].append(inv) + g["_inv"].add(inv) + + out = [] + for g in groups.values(): + emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "") + sent = sum(1 for e in emails if e["direction"] == "sent") + out.append({ + "user_id": g["user_id"], "username": g["username"], + "full_name": g["full_name"], "account_email": g["account_email"], + "emails": emails, "investors": sorted(g["_inv"]), + "sent": sent, "received": len(emails) - sent, "total": len(emails), + }) + out.sort(key=lambda x: (x["full_name"] or x["username"] or "").lower()) + return out + + +def collect_investor_activity(conn, since_iso, until_iso): + """Re-pivot the same window by investor (across the whole team), deduping each + email so a reply to several team members counts once. Direction is decided at + the EMAIL level: outbound if the sender is one of our mailboxes, else inbound. + + Each bucket: {name, emails[{email_id, direction in/out, subject, sent_at, + members[]}], inbound, outbound, total}. Sorted most-active first.""" + rows = _fetch_activity_rows(conn, since_iso, until_iso) + own = _own_addresses(conn) + groups = {} + for r in rows: + name = (r["investor_name"] or "").strip() or "(unmatched)" + g = groups.get(name) + if g is None: + g = {"name": name, "_emails": {}} + groups[name] = g + eid = r["email_id"] + em = g["_emails"].get(eid) + if em is None: + outbound = (r["from_email"] or "").lower().strip() in own + em = {"email_id": eid, "direction": "out" if outbound else "in", + "subject": r["subject"], "sent_at": r["sent_at"], "members": []} + g["_emails"][eid] = em + # Attribute the sending team member on outbound mail (the sighting with + # is_sent=1); inbound is "from them", so no member shown. + if em["direction"] == "out" and r["is_sent"]: + who = (r["full_name"] or r["username"] or "").strip() + if who and who not in em["members"]: + em["members"].append(who) + + out = [] + for g in groups.values(): + emails = sorted(g["_emails"].values(), key=lambda e: e["sent_at"] or "") + inbound = sum(1 for e in emails if e["direction"] == "in") + out.append({"name": g["name"], "emails": emails, + "inbound": inbound, "outbound": len(emails) - inbound, "total": len(emails)}) + out.sort(key=lambda x: (-x["total"], x["name"].lower())) + return out + + +# ------------------------------------------------------------------ policy + +DIGEST_POLICY_KEY = "digest_policy" +DEFAULT_DIGEST_POLICY = {"enabled": False, "send_hour": 18} + + +def load_digest_policy(conn): + """Resolve the live digest policy. Precedence: the app_settings DB row (the + admin-panel control) wins; absent that, the CRM_DIGEST_ENABLED/SEND_HOUR env + vars seed a first-boot default; absent those, DEFAULT_DIGEST_POLICY. Returns + {enabled: bool, send_hour: int 0-23}. Shared by the server (API) and the + scheduler so both read one source of truth.""" + pol = dict(DEFAULT_DIGEST_POLICY) + + env_enabled = os.environ.get("CRM_DIGEST_ENABLED") + if env_enabled is not None: + pol["enabled"] = env_enabled.lower() in ("1", "true", "yes", "on") + env_hour = os.environ.get("CRM_DIGEST_SEND_HOUR") + if env_hour: + try: + pol["send_hour"] = min(23, max(0, int(env_hour))) + except ValueError: + pass + + try: + row = conn.execute( + "SELECT value_json FROM app_settings WHERE key = ?", (DIGEST_POLICY_KEY,)).fetchone() + except sqlite3.OperationalError: + row = None + if row: + try: + saved = json.loads(row["value_json"]) + except Exception: + saved = None + if isinstance(saved, dict): + if "enabled" in saved: + pol["enabled"] = bool(saved["enabled"]) + if "send_hour" in saved: + try: + pol["send_hour"] = min(23, max(0, int(saved["send_hour"]))) + except (ValueError, TypeError): + pass + return pol + + +# ------------------------------------------------------------------ summarization + +def _default_chat(prompt, system=None, max_tokens=220): + """Lazily reach the local Qwen chat via Spark Control (ingest/llm.py).""" + import os + import sys + sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest")) + import llm # noqa: E402 + return llm.chat(prompt, system=system, max_tokens=max_tokens) + + +def _user_email_block(group, max_emails=20, body_chars=500): + lines = [] + for em in group["emails"][:max_emails]: + invs = ", ".join(em["investors"]) or "(unmatched)" + body = " ".join((em.get("text") or "").split())[:body_chars] + line = f"- [{em['direction']}] {invs} | subject: {em.get('subject') or '(none)'}" + if body: + line += f" | {body}" + lines.append(line) + return "\n".join(lines) + + +def _fallback_narrative(group): + """Deterministic summary when the local model is unavailable — the digest + must still send (always-send) with real counts rather than fail.""" + name = group.get("full_name") or group.get("username") or "Team member" + invs = ", ".join(group["investors"]) or "no matched investors" + return (f"{name} had {group['total']} email(s) " + f"({group['sent']} sent, {group['received']} received) with {invs}. " + "(Local summary unavailable.)") + + +def summarize_user_day(group, chat_fn=None): + """One narrative paragraph for a user's day, from the local model. Falls back + to a deterministic count summary on any error or empty reply.""" + fn = chat_fn or _default_chat + name = group.get("full_name") or group.get("username") or "The team member" + prompt = f"Team member: {name}\nEmails:\n{_user_email_block(group)}" + try: + out = fn(prompt, system=_SYSTEM, max_tokens=220) + out = " ".join((out or "").split()).strip() + if out: + return out + except Exception: + pass + return _fallback_narrative(group) + + +# ------------------------------------------------------------------ composition + +def _parse_iso(iso): + if not iso: + return None + s = str(iso).strip() + for fmt in ("%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S"): + try: + return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc) + except ValueError: + continue + return None + + +def _fmt_local(iso): + """UTC ISO -> human local time, e.g. 'Jun 17 2:14 PM'. Manual 12h formatting + to stay portable (no platform-specific %-I).""" + dt = _parse_iso(iso) + if dt is None: + return str(iso) + dt = dt.astimezone() + hour12 = dt.hour % 12 or 12 + ampm = "AM" if dt.hour < 12 else "PM" + return f"{dt.strftime('%b')} {dt.day} {hour12}:{dt.minute:02d} {ampm}" + + +def _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso): + title_date = datetime.now().astimezone().strftime("%A, %b %d %Y") + window = f"{_fmt_local(since_iso)} – {_fmt_local(until_iso)}" + L = ["Ten31 CRM — Daily Activity Digest", title_date, f"Window: {window}", ""] + + if not user_groups: + L += ["No tracked email activity from any user in this window.", "", _FOOTER] + return "\n".join(L) + + total_emails = sum(g["total"] for g in user_groups) + total_invs = len({i for g in user_groups for i in g["investors"]}) + L.append(f"{len(user_groups)} team member(s) active · {total_emails} email(s) " + f"· {total_invs} investor(s)") + + # ── Section 1: by team member (who did what; per-user Spark narrative) ── + L += ["", _RULE, "BY TEAM MEMBER", _RULE] + for g in user_groups: + invs = ", ".join(g["investors"]) or "(no matched investor)" + L += ["", + f"{g['full_name'] or g['username']} · {g['account_email']}", + f"{g['total']} email(s) ({g['sent']} sent, {g['received']} received) " + f"· {invs}", "", + narratives.get(g["user_id"], ""), ""] + for em in g["emails"]: + arrow = "→ Sent" if em["direction"] == "sent" else "← Received" + invs_e = ", ".join(em["investors"]) or "(unmatched)" + subj = em.get("subject") or "(no subject)" + L.append(f" {arrow} · {invs_e} · \"{subj}\" ({_fmt_local(em['sent_at'])})") + + # ── Section 2: by investor (team-wide; both directions, structured) ── + L += ["", _RULE, "BY INVESTOR", _RULE] + for inv in investor_groups: + L += ["", + f"{inv['name']} · {inv['total']} email(s) " + f"({inv['inbound']} in, {inv['outbound']} out)"] + for em in inv["emails"]: + subj = em.get("subject") or "(no subject)" + when = _fmt_local(em["sent_at"]) + if em["direction"] == "out": + who = ", ".join(em["members"]) or "team" + L.append(f" → Sent by {who} · \"{subj}\" ({when})") + else: + L.append(f" ← Received · \"{subj}\" ({when})") + + L += ["", _RULE, _FOOTER] + return "\n".join(L) + + +def build_digest(conn, since_iso, until_iso, chat_fn=None): + """Build the daily digest for [since_iso, until_iso). Returns + {subject, body, has_activity, user_count, email_count, investor_count}. Always + returns a body (empty windows get a 'no activity' note — the team chose + always-send). Two sections: by team member (per-user Spark narrative) and by + investor (structured, both directions).""" + user_groups = collect_user_activity(conn, since_iso, until_iso) + investor_groups = collect_investor_activity(conn, since_iso, until_iso) + narratives = {g["user_id"]: summarize_user_day(g, chat_fn) for g in user_groups} + body = _compose_body(user_groups, investor_groups, narratives, since_iso, until_iso) + stamp = datetime.now().astimezone().strftime("%b %d") + return { + "subject": f"Ten31 CRM — Daily Activity Digest · {stamp}", + "body": body, + "has_activity": bool(user_groups), + "user_count": len(user_groups), + "email_count": sum(g["total"] for g in user_groups), + "investor_count": len(investor_groups), + } diff --git a/backend/email_integration/digest_scheduler.py b/backend/email_integration/digest_scheduler.py new file mode 100644 index 0000000..684a093 --- /dev/null +++ b/backend/email_integration/digest_scheduler.py @@ -0,0 +1,185 @@ +"""Daily activity-digest scheduler (Phase B). + +Co-located with the Gmail sync scheduler (it shares the same conn-factory and +daemon-thread idiom). One daemon thread wakes every 60s and fires the daily +activity digest once per local day, at/after the configured send hour. + +Control lives in the DB, set from Settings -> Admin (digest_builder.load_digest_policy +-> app_settings 'digest_policy'): {enabled, send_hour}. The thread always runs and +re-reads the policy each cycle, so toggling the digest on/off or changing the time +takes effect on the next loop — no restart. CRM_DIGEST_ENABLED/SEND_HOUR only seed +the first-boot default before an admin sets the policy. + +The send is an internal ops email to the team's own admins — exempt from the +"agents draft, humans send" rule (which governs outward LP/prospect contact). +Digest content is summarized on Spark (local), never Claude — see digest_builder. + +Window: the content covers (last successful send, now]. Tracked in app_settings +so a missed day's activity rolls into the next digest rather than being dropped; +the first-ever run covers the prior 24h. The once-per-day guard is a separate +local-date key. The transport (Gmail-DWD -> SMTP) is digest_mailer's job. +""" +import json +import logging +import sqlite3 +import threading +from datetime import datetime, timedelta, timezone + +from .scheduler import _conn_factory_from_env + + +log = logging.getLogger("email_integration.digest_scheduler") + +_LAST_DATE_KEY = "digest_last_sent_date" # local YYYY-MM-DD — once-per-day guard +_LAST_AT_KEY = "digest_last_sent_at" # UTC ISO — content-window cursor + +_state: dict[str, object] = {"thread": None, "stop": threading.Event()} + + +def _utc_iso(dt: datetime) -> str: + return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +# app_settings access kept local (no server.py import — avoid the startup cycle); +# the value_json/JSON encoding matches server.get/set_app_setting exactly. +def _get_setting(conn, key): + try: + row = conn.execute("SELECT value_json FROM app_settings WHERE key = ?", (key,)).fetchone() + except sqlite3.OperationalError: + return None + if not row: + return None + try: + return json.loads(row["value_json"]) + except Exception: + return None + + +def _set_setting(conn, key, value) -> None: + conn.execute( + "INSERT INTO app_settings (key, value_json, updated_at) VALUES (?, ?, ?) " + "ON CONFLICT(key) DO UPDATE SET value_json = excluded.value_json, " + "updated_at = excluded.updated_at", + (key, json.dumps(value), _utc_iso(datetime.now(timezone.utc))), + ) + + +def _admin_recipients(conn) -> list[str]: + rows = conn.execute( + "SELECT email FROM users WHERE role = 'admin' AND is_active = 1 " + "AND email IS NOT NULL AND TRIM(email) != ''" + ).fetchall() + return [str(r["email"]).strip() for r in rows if str(r["email"]).strip()] + + +def _build_and_send(conn, since_iso, until_iso, *, build_fn=None, send_fn=None): + """Build the digest and hand it to the transport. Raises digest_mailer.NoTransport + (no transport / no recipient) — callers map that to a clear 400; the daily loop + logs it. build_fn/send_fn are injectable for tests.""" + import digest_builder + import digest_mailer + + bf = build_fn or digest_builder.build_digest + sf = send_fn or digest_mailer.send_digest + + recipients = _admin_recipients(conn) + if not recipients: + raise digest_mailer.NoTransport( + "No active admin has an email address — give one an address to receive the digest.") + + digest = bf(conn, since_iso, until_iso) + result = sf(conn, recipients, digest["subject"], digest["body"]) + return { + "recipients": recipients, + "transport": (result or {}).get("transport"), + "has_activity": digest["has_activity"], + "user_count": digest["user_count"], + "email_count": digest["email_count"], + "investor_count": digest.get("investor_count"), + "window": [since_iso, until_iso], + } + + +def maybe_send_digest(conn_factory=None, *, force=False, + now_local=None, now_utc=None, build_fn=None, send_fn=None): + """Send the daily digest if it is due (or unconditionally when force=True). + + Daily path: skips before the send hour and if already sent today; content + window runs from the last send to now and the cursor advances on success. + force path (the admin 'send now' endpoint): ignores the policy and the guards, + uses a fixed last-24h window, and does NOT advance the daily cursor — so an + on-demand preview never suppresses the scheduled send.""" + import digest_builder + + factory = conn_factory or _conn_factory_from_env() + conn = factory() + try: + policy = digest_builder.load_digest_policy(conn) + if not force and not policy["enabled"]: + return {"status": "disabled"} + + nl = now_local or datetime.now() + nu = now_utc or datetime.now(timezone.utc) + + if not force: + today = nl.strftime("%Y-%m-%d") + if nl.hour < policy["send_hour"]: + return {"status": "before_send_hour", "send_hour": policy["send_hour"]} + if _get_setting(conn, _LAST_DATE_KEY) == today: + return {"status": "already_sent_today"} + + until_iso = _utc_iso(nu) + last_at = None if force else _get_setting(conn, _LAST_AT_KEY) + since_iso = last_at or _utc_iso(nu - timedelta(hours=24)) + + result = _build_and_send(conn, since_iso, until_iso, build_fn=build_fn, send_fn=send_fn) + + if not force: + _set_setting(conn, _LAST_DATE_KEY, nl.strftime("%Y-%m-%d")) + _set_setting(conn, _LAST_AT_KEY, until_iso) + conn.commit() + return {"status": "sent", **result} + finally: + conn.close() + + +def start_digest_scheduler(conn_factory=None) -> None: + """Start the daily digest loop (idempotent). The thread always runs and reads + the DB policy each cycle (admin-panel control), so it sends only when the policy + is enabled — no env gate, no restart needed to toggle.""" + if _state["thread"] is not None: + return + + factory = conn_factory or _conn_factory_from_env() + stop = threading.Event() + _state["stop"] = stop + + def _loop(): + log.info("digest scheduler started (policy-controlled via Settings -> Admin)") + if stop.wait(15): # let server finish startup + return + while not stop.is_set(): + try: + res = maybe_send_digest(factory) + if res.get("status") == "sent": + log.info("daily digest sent: %s", res) + except Exception: + log.exception("digest send failed; will retry next cycle") + if stop.wait(60): + return + + t = threading.Thread(target=_loop, name="digest", daemon=True) + t.start() + _state["thread"] = t + + +def stop_digest_scheduler() -> None: + ev: threading.Event = _state["stop"] # type: ignore + ev.set() + t = _state.get("thread") + if t: + try: + t.join(timeout=5) # type: ignore + except Exception: + pass + _state["thread"] = None diff --git a/backend/server.py b/backend/server.py index 65b5df4..5819046 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1817,6 +1817,8 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_list_fundraising_backups(user) if path == '/api/fundraising/backup-policy': return self.handle_get_backup_policy(user) + if path == '/api/admin/digest/policy': + return self.handle_get_digest_policy(user) if path == '/api/fundraising/relational-summary': return self.handle_get_fundraising_relational_summary(user) if path == '/api/fundraising/automations': @@ -1921,6 +1923,8 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_admin_reset_all_data(user, body) if path == '/api/admin/digest/test-email': return self.handle_admin_send_test_email(user, body) + if path == '/api/admin/digest/send-now': + return self.handle_admin_send_digest_now(user, body) if path == '/api/fundraising/backup': return self.handle_backup_fundraising_state(user) if path == '/api/fundraising/restore-preview': @@ -2019,6 +2023,8 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_admin_update_user(user, target_user_id, body) if path == '/api/fundraising/backup-policy': return self.handle_update_backup_policy(user, body) + if path == '/api/admin/digest/policy': + return self.handle_update_digest_policy(user, body) if re.match(r'^/api/fundraising/automations/[^/]+$', path): rule_id = path.split('/')[-1] return self.handle_update_fundraising_automation_rule(user, rule_id, body) @@ -4292,6 +4298,63 @@ class CRMHandler(BaseHTTPRequestHandler): return self.send_json({"data": {"status": "sent", **result}}) + def handle_admin_send_digest_now(self, user, body): + """Build the REAL daily activity digest (last 24h) on demand and send it to + the active-admin set now. An on-demand preview of Phase B — it does not + touch the daily schedule's cursor, so it never suppresses the scheduled send. + Content is summarized on Spark (local), never Claude.""" + if not require_admin(user): + return self.send_error_json("Admin only", 403) + + import digest_mailer + try: + from email_integration.digest_scheduler import maybe_send_digest + result = maybe_send_digest(force=True) + except digest_mailer.NoTransport as exc: + return self.send_error_json(str(exc), 400) + except Exception as exc: + # Never echo the exception — an auth error can carry a token/credential. + print(f"[digest] send-now failed: {type(exc).__name__}: {exc}", file=sys.stderr) + return self.send_error_json("Send failed — see server logs for details.", 502) + + return self.send_json({"data": result}) + + def handle_get_digest_policy(self, user): + """Return the live daily-digest policy (enabled + send hour). DB-backed + (app_settings), set from this same panel — see digest_builder.load_digest_policy.""" + if not require_admin(user): + return self.send_error_json("Admin access required", 403) + import digest_builder + conn = get_db() + try: + return self.send_json({"data": digest_builder.load_digest_policy(conn)}) + finally: + conn.close() + + def handle_update_digest_policy(self, user, body): + """Update the daily-digest policy. Takes effect on the scheduler's next + cycle (no restart). Recipients stay the active-admin set; sender/transport + are env/StartOS config, not toggled here.""" + if not require_admin(user): + return self.send_error_json("Admin access required", 403) + import digest_builder + conn = get_db() + try: + policy = digest_builder.load_digest_policy(conn) + if 'enabled' in body: + policy['enabled'] = bool(body.get('enabled')) + if 'send_hour' in body: + try: + policy['send_hour'] = max(0, min(23, int(body.get('send_hour')))) + except (ValueError, TypeError): + return self.send_error_json("send_hour must be an integer from 0 to 23") + normalized = {"enabled": bool(policy['enabled']), "send_hour": int(policy['send_hour'])} + set_app_setting(conn, digest_builder.DIGEST_POLICY_KEY, normalized) + conn.commit() + return self.send_json({"data": normalized}) + finally: + conn.close() + def handle_list_audit_log(self, user, params): if not require_admin(user): return self.send_error_json("Admin access required", 403) @@ -5425,6 +5488,16 @@ def main(): except Exception as _e: print(f"[email_integration] failed to start scheduler: {_e}") + # ─── Daily activity digest scheduler ───────────────────────────── + # Always started; it reads the digest policy (enabled + send hour) from the DB + # each cycle, so the Settings → Admin toggle controls it live (no restart). + try: + from email_integration.digest_scheduler import start_digest_scheduler + start_digest_scheduler() + print("[digest] daily activity digest scheduler started (policy-controlled)") + except Exception as _e: + print(f"[digest] failed to start digest scheduler: {_e}") + # ThreadingHTTPServer lets one slow request (or a wave of scanner probes) # not block legit users. SQLite is opened per-request via get_db(), and # WAL mode allows concurrent readers + a single writer, so this is safe. diff --git a/backend/test_digest_builder.py b/backend/test_digest_builder.py new file mode 100644 index 0000000..71d3a5e --- /dev/null +++ b/backend/test_digest_builder.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +"""Tests for the daily activity digest (Phase B): the per-user + per-investor +activity queries (soft-delete filtered), inbound dedup, the two-section body, the +local-summary fallback, the DB-backed policy resolver, and the scheduler's +once-per-day / send-hour / policy / force guards. + +The local Spark model and the mail transport are stubbed — no network. Synthetic +data only (guardrail #9). +Run: cd backend && python3 test_digest_builder.py +""" +import json +import os +import sqlite3 +import sys +import tempfile +from datetime import datetime, timezone + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +os.environ["CRM_DB_PATH"] = os.path.join(tempfile.mkdtemp(), "crm.db") +os.environ.setdefault("CRM_DATA_DIR", os.path.dirname(os.environ["CRM_DB_PATH"])) +os.environ["CRM_DIGEST_ENABLED"] = "1" # so the non-force scheduler path is live + +import digest_builder # noqa: E402 +from email_integration import digest_scheduler # noqa: E402 + +FAILS = [] +SINCE = "2026-06-17T00:00:00Z" +UNTIL = "2026-06-18T00:00:00Z" + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +def _conn(): + conn = sqlite3.connect(os.environ["CRM_DB_PATH"]) + conn.row_factory = sqlite3.Row + return conn + + +def setup(): + conn = _conn() + conn.executescript(""" + CREATE TABLE users (id TEXT PRIMARY KEY, username TEXT, full_name TEXT, + email TEXT, role TEXT, is_active INT DEFAULT 1); + CREATE TABLE email_accounts (id TEXT PRIMARY KEY, user_id TEXT, email_address TEXT); + CREATE TABLE emails (id TEXT PRIMARY KEY, subject TEXT, body_text TEXT, snippet TEXT, + from_email TEXT, sent_at TEXT, is_matched INT DEFAULT 1); + CREATE TABLE email_account_messages (id TEXT PRIMARY KEY, email_id TEXT, account_id TEXT, + gmail_message_id TEXT, gmail_thread_id TEXT, is_sent INT DEFAULT 0, deleted_at TEXT); + CREATE TABLE email_investor_links (id TEXT PRIMARY KEY, email_id TEXT, + fundraising_investor_id TEXT, organization_id TEXT, contact_id TEXT, matched_address TEXT); + CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT); + CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, deleted_at TEXT); + CREATE TABLE contacts (id TEXT PRIMARY KEY, first_name TEXT, last_name TEXT, deleted_at TEXT); + CREATE TABLE app_settings (key TEXT PRIMARY KEY, value_json TEXT, updated_at TEXT); + """) + conn.executemany("INSERT INTO users (id,username,full_name,email,role,is_active) VALUES (?,?,?,?,?,?)", [ + ("u1", "grant", "Grant Gilliam", "grant@ten31.xyz", "admin", 1), + ("u2", "jk", "Jonathan K", "jk@ten31.xyz", "member", 1), + ("u3", "retired", "Old Admin", "old@ten31.xyz", "admin", 0), # inactive -> excluded + ]) + conn.executemany("INSERT INTO email_accounts (id,user_id,email_address) VALUES (?,?,?)", [ + ("a1", "u1", "grant@ten31.xyz"), ("a2", "u2", "jk@ten31.xyz"), ("a3", "u3", "old@ten31.xyz"), + ]) + conn.executemany("INSERT INTO fundraising_investors (id,investor_name) VALUES (?,?)", [ + ("inv1", "Harbor & Vine"), ("inv2", "Brightwater Capital"), ("inv3", "Vela Partners"), + ]) + conn.executemany("INSERT INTO organizations (id,name,deleted_at) VALUES (?,?,?)", [ + ("org1", "Summit Fund", None), ("org2", "Deleted Org", "2026-06-01T00:00:00Z"), + ]) + conn.executemany("INSERT INTO contacts (id,first_name,last_name,deleted_at) VALUES (?,?,?,?)", [ + ("c1", "Jane", "Roe", None), + ]) + # emails: id, subject, body, from_email, sent_at, is_matched. Outbound = from one + # of our own mailboxes (grant@/jk@/old@ ten31.xyz); inbound = from outside. + conn.executemany("INSERT INTO emails (id,subject,body_text,from_email,sent_at,is_matched) VALUES (?,?,?,?,?,?)", [ + ("e1", "Fund III terms", "Discussing Fund III terms", "grant@ten31.xyz", "2026-06-17T14:00:00Z", 1), + ("e2", "Re: allocation", "Question about allocation", "lp@brightwater.example", "2026-06-17T09:00:00Z", 1), + ("e3", "Intro", "Summit intro thread", "jk@ten31.xyz", "2026-06-17T11:00:00Z", 1), + ("e4", "Coffee", "Catch up note", "jane@roe.example", "2026-06-17T16:00:00Z", 1), + ("e5", "Wire", "Wire instructions", "ir@summitcap.example", "2026-06-17T17:00:00Z", 1), + ("e6", "Old", "ancient", "grant@ten31.xyz", "2026-06-10T10:00:00Z", 1), # out of window + ("e7", "Tombstoned", "deleted sighting", "lp@harborvine.example", "2026-06-17T08:00:00Z", 1), + ("e8", "Inactive", "from retired user", "old@ten31.xyz", "2026-06-17T12:00:00Z", 1), + ("e9", "Unmatched", "not matched", "lp@harborvine.example", "2026-06-17T13:00:00Z", 0), # is_matched=0 + ("e10", "Group update", "inbound to two of us", "lp@vela.example", "2026-06-17T15:00:00Z", 1), + ]) + # sightings: id, email_id, account_id, is_sent, deleted_at + conn.executemany( + "INSERT INTO email_account_messages (id,email_id,account_id,gmail_message_id,gmail_thread_id,is_sent,deleted_at) " + "VALUES (?,?,?,?,?,?,?)", [ + ("s1", "e1", "a1", "g1", "t1", 1, None), # grant SENT + ("s2", "e2", "a1", "g2", "t2", 0, None), # grant RECEIVED + ("s3", "e3", "a2", "g3", "t3", 1, None), # jk SENT + ("s4", "e4", "a1", "g4", "t4", 0, None), # grant RECEIVED (contact) + ("s5", "e5", "a1", "g5", "t5", 0, None), # grant RECEIVED (deleted org) + ("s6", "e6", "a1", "g6", "t6", 0, None), # out of window + ("s7", "e7", "a1", "g7", "t7", 0, "2026-06-17T09:00:00Z"), # tombstoned + ("s8", "e8", "a3", "g8", "t8", 1, None), # inactive user + ("s9", "e9", "a1", "g9", "t9", 0, None), # unmatched email + ("s10a", "e10", "a1", "g10a", "t10", 0, None), # e10 received by grant ... + ("s10b", "e10", "a2", "g10b", "t10", 0, None), # ... and by jk (dedup target) + ]) + # investor links: id, email_id, fr_investor, org, contact, matched_address + conn.executemany( + "INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,organization_id,contact_id,matched_address) " + "VALUES (?,?,?,?,?,?)", [ + ("l1", "e1", "inv1", None, None, "lp@harborvine.example"), + ("l2", "e2", "inv2", None, None, "lp@brightwater.example"), + ("l3", "e3", None, "org1", None, "ir@summitfund.example"), # org name + ("l4", "e4", None, None, "c1", "jane@roe.example"), # contact name + ("l5", "e5", None, "org2", None, "ir@summitcap.example"), # deleted org -> address + ("l6", "e6", "inv1", None, None, "lp@harborvine.example"), + ("l7", "e7", "inv1", None, None, "lp@harborvine.example"), + ("l8", "e8", "inv1", None, None, "lp@harborvine.example"), + ("l9", "e9", "inv1", None, None, "lp@harborvine.example"), + ("l10", "e10", "inv3", None, None, "lp@vela.example"), + ]) + conn.commit() + conn.close() + + +def test_collect(): + conn = _conn() + groups = digest_builder.collect_user_activity(conn, SINCE, UNTIL) + conn.close() + check(len(groups) == 2, f"two active users with activity (grant, jk), got {len(groups)}") + by_user = {g["user_id"]: g for g in groups} + check("u3" not in by_user, "inactive user (u3) excluded") + + grant = by_user.get("u1") + if not grant: + FAILS.append("grant group missing"); return + ids = set(e["email_id"] for e in grant["emails"]) + check(ids == {"e1", "e2", "e4", "e5", "e10"}, + f"grant has e1,e2,e4,e5,e10 (e6 out-of-window, e7 tombstoned, e9 unmatched excluded), got {sorted(ids)}") + check(grant["sent"] == 1 and grant["received"] == 4, f"grant 1 sent / 4 received, got {grant['sent']}/{grant['received']}") + e1 = next(e for e in grant["emails"] if e["email_id"] == "e1") + check(e1["direction"] == "sent", "e1 direction sent") + check(e1["investors"] == ["Harbor & Vine"], f"e1 investor = grid name, got {e1['investors']}") + e4 = next(e for e in grant["emails"] if e["email_id"] == "e4") + check(e4["investors"] == ["Jane Roe"], f"e4 investor = contact fallback name, got {e4['investors']}") + e5 = next(e for e in grant["emails"] if e["email_id"] == "e5") + check(e5["investors"] == ["ir@summitcap.example"], f"e5 investor = address (deleted org skipped), got {e5['investors']}") + + jk = by_user.get("u2") + check(jk and jk["emails"][0]["investors"] == ["Summit Fund"], "jk e3 investor = org name") + + +def test_investor(): + conn = _conn() + inv = digest_builder.collect_investor_activity(conn, SINCE, UNTIL) + conn.close() + by_name = {g["name"]: g for g in inv} + # Harbor & Vine, Brightwater, Vela Partners, Summit Fund, Jane Roe, ir@summitcap.example + check(len(inv) == 6, f"six investors with activity, got {len(inv)}: {sorted(by_name)}") + + hv = by_name.get("Harbor & Vine") + check(hv and hv["outbound"] == 1 and hv["inbound"] == 0, f"Harbor & Vine = 1 out / 0 in, got {hv}") + check(hv and hv["emails"][0]["members"] == ["Grant Gilliam"], f"outbound attributed to sender, got {hv and hv['emails'][0]['members']}") + + bw = by_name.get("Brightwater Capital") + check(bw and bw["inbound"] == 1 and bw["outbound"] == 0, f"Brightwater = 1 in / 0 out, got {bw}") + + # e10 was received by TWO mailboxes (grant + jk) -> dedup to one inbound email + vela = by_name.get("Vela Partners") + check(vela and vela["total"] == 1 and vela["inbound"] == 1, + f"Vela inbound deduped across 2 sightings -> 1, got {vela}") + + +def test_build_and_empty(): + conn = _conn() + stub = lambda prompt, system=None, max_tokens=220: "Grant worked with Harbor & Vine on Fund III." + d = digest_builder.build_digest(conn, SINCE, UNTIL, chat_fn=stub) + check(d["has_activity"] is True, "build_digest has_activity True when there is activity") + check(d["user_count"] == 2 and d["email_count"] == 7 and d["investor_count"] == 6, + f"counts: 2 users / 7 emails / 6 investors, got {d['user_count']}/{d['email_count']}/{d['investor_count']}") + body = d["body"] + check("Daily Activity Digest" in body, "body has title") + check("BY TEAM MEMBER" in body and "BY INVESTOR" in body, "body has both sections") + check("Grant Gilliam" in body and "Jonathan K" in body, "body names both active users") + check("Harbor & Vine" in body and "Brightwater Capital" in body and "Vela Partners" in body, + "investor section lists investors") + check("Grant worked with Harbor & Vine on Fund III." in body, "body includes the local narrative") + + empty = digest_builder.build_digest(conn, "2030-01-01T00:00:00Z", "2030-01-02T00:00:00Z", chat_fn=stub) + check(empty["has_activity"] is False, "empty window -> has_activity False") + check("No tracked email activity" in empty["body"], "empty window -> 'no activity' note (always-send)") + check("BY INVESTOR" not in empty["body"], "empty window -> no section headers") + conn.close() + + +def test_policy(): + conn = _conn() + # No DB row yet: CRM_DIGEST_ENABLED=1 (set at import) seeds enabled; hour defaults 18. + pol = digest_builder.load_digest_policy(conn) + check(pol["enabled"] is True and pol["send_hour"] == 18, f"env seed -> enabled, hour 18, got {pol}") + # A DB row wins over the env seed (the admin-panel control). + conn.execute("INSERT OR REPLACE INTO app_settings (key,value_json,updated_at) VALUES (?,?,?)", + (digest_builder.DIGEST_POLICY_KEY, json.dumps({"enabled": False, "send_hour": 9}), "x")) + conn.commit() + pol2 = digest_builder.load_digest_policy(conn) + check(pol2["enabled"] is False and pol2["send_hour"] == 9, f"DB policy wins over env, got {pol2}") + conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,)) + conn.commit() + conn.close() + + +def test_summary_fallback(): + grp = {"user_id": "u1", "full_name": "Grant Gilliam", "username": "grant", + "emails": [{"direction": "sent", "subject": "x", "investors": ["Harbor & Vine"], "text": "hi"}], + "investors": ["Harbor & Vine"], "sent": 1, "received": 0, "total": 1} + def boom(*a, **k): + raise RuntimeError("spark down") + out = digest_builder.summarize_user_day(grp, chat_fn=boom) + check("Grant Gilliam" in out and "1 sent" in out and "unavailable" in out.lower(), + f"fallback narrative on chat error, got: {out}") + + +def test_scheduler_guards(): + sent_calls = [] + build_fn = lambda conn, since, until: {"subject": "S", "body": "B", + "has_activity": True, "user_count": 1, "email_count": 2} + def send_fn(conn, to_addrs, subject, body, sender_email=None): + sent_calls.append(list(to_addrs)) + return {"transport": "stub"} + + factory = _conn + utc = datetime(2026, 6, 18, 1, 0, tzinfo=timezone.utc) + + # Before the send hour (09:00 local < 18:00) -> no send + r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 9, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "before_send_hour" and not sent_calls, f"before send hour -> skip, got {r}") + + # At/after the send hour -> sends once, only to the active admin (grant) + r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 19, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "sent" and len(sent_calls) == 1, f"after send hour -> sends, got {r}") + check(sent_calls and sent_calls[-1] == ["grant@ten31.xyz"], f"recipients = active admins only, got {sent_calls[-1]}") + # The window cursor must advance to the send time so a missed day rolls forward + # (since, now] — not be left unset/stale. + conn = _conn() + cursor_at = digest_scheduler._get_setting(conn, digest_scheduler._LAST_AT_KEY) + conn.close() + check(cursor_at == digest_scheduler._utc_iso(utc), + f"window cursor advanced to send time, got {cursor_at}") + + # Same local day again -> suppressed (once-per-day guard) + r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 18, 20, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "already_sent_today" and len(sent_calls) == 1, f"second send same day -> skip, got {r}") + + # force=True ignores the hour + once-per-day guard, and does NOT touch the cursor + conn = _conn() + before = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY) + conn.close() + r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 18, 3, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "sent" and len(sent_calls) == 2, f"force sends regardless of guards, got {r}") + conn = _conn() + after = digest_scheduler._get_setting(conn, digest_scheduler._LAST_DATE_KEY) + conn.close() + check(before == after, "force send does not advance the daily cursor") + + # DB policy disabled -> daily path skips even past the hour; force still sends. + conn = _conn() + digest_scheduler._set_setting(conn, digest_builder.DIGEST_POLICY_KEY, {"enabled": False, "send_hour": 18}) + conn.commit() + conn.close() + r = digest_scheduler.maybe_send_digest(factory, now_local=datetime(2026, 6, 19, 19, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "disabled" and len(sent_calls) == 2, f"DB-disabled policy skips daily send, got {r}") + r = digest_scheduler.maybe_send_digest(factory, force=True, now_local=datetime(2026, 6, 19, 2, 0), + now_utc=utc, build_fn=build_fn, send_fn=send_fn) + check(r["status"] == "sent" and len(sent_calls) == 3, f"force overrides disabled policy, got {r}") + conn = _conn() + conn.execute("DELETE FROM app_settings WHERE key = ?", (digest_builder.DIGEST_POLICY_KEY,)) + conn.commit() + conn.close() + + +def main(): + setup() + print("collect_user_activity:"); test_collect() + print("collect_investor_activity:"); test_investor() + print("build_digest + empty:"); test_build_and_empty() + print("summary fallback:"); test_summary_fallback() + print("digest policy:"); test_policy() + print("scheduler guards:"); test_scheduler_guards() + if FAILS: + print(f"\nFAILED ({len(FAILS)})") + for f in FAILS: + print(" - " + f) + sys.exit(1) + print("\nALL PASS (digest builder + scheduler)") + + +if __name__ == "__main__": + main() diff --git a/docs/guides/email.md b/docs/guides/email.md index b91652d..6bba6fc 100644 --- a/docs/guides/email.md +++ b/docs/guides/email.md @@ -40,8 +40,45 @@ different category. **Never extend this path to send to LPs/prospects.** independent of any StartOS system-wide SMTP. - The admin **`POST /api/admin/digest/test-email`** restricts recipients to the active-admin set (not an open relay), and logs send failures rather than echoing them (an auth error can - carry a token/credential). Digest *content* generation (Phase B) runs on **Spark, never - Claude** — the digest is deliberately un-anonymized. + carry a token/credential). + +### Phase B — the daily digest itself (built) + +- **Content builder: `backend/digest_builder.py`** (top-level). `build_digest(conn, since_iso, + until_iso, chat_fn=None)` returns `{subject, body, has_activity, user_count, email_count, + investor_count}` and composes **two sections**: + - **By team member** — `collect_user_activity`: per registered user, both directions + (per-mailbox `eam.is_sent`), with **one Spark narrative paragraph** per user + (`ingest/llm.py` → Spark Control `/v1/chat/completions`), **never Claude** (the digest is + deliberately un-anonymized — real LP names + substance stay local). Deterministic + count-only fallback if Spark is unreachable (always-send must not fail). + - **By investor** — `collect_investor_activity`: re-pivots the same window across the whole + team, **deduped per email** (a reply to several teammates counts once), direction decided + at the **email level** (outbound if `from_email` is one of our mailboxes, else inbound). + Structured list, no extra Spark calls. + - Soft-delete filters: `email_account_messages.deleted_at IS NULL` + `users.is_active = 1`, + and the org/contact name joins drop soft-deleted rows (falling back to the matched address). +- **Control is DB-backed, set from the admin panel** — `digest_builder.load_digest_policy(conn)` + reads `app_settings.digest_policy` = `{enabled, send_hour}`. Precedence: **DB row wins** + (the Settings → Admin toggle + send-time dropdown), else `CRM_DIGEST_ENABLED`/ + `CRM_DIGEST_SEND_HOUR` seed a first-boot default, else `{false, 18}`. `GET`/`PATCH + /api/admin/digest/policy` (admin-only) read/write it. **Not a StartOS action** — it's an + operational toggle, so it lives in-app where it's discoverable and takes effect live. +- **Scheduler: `backend/email_integration/digest_scheduler.py`** (co-located with the sync + scheduler). One daemon thread, **always started**; each cycle (60s) re-reads the DB policy + and sends once per local day at/after `send_hour` **only when `enabled`** — so toggling in + the panel takes effect with no restart. Content window = (last send, now]; cursor + (`digest_last_sent_at`) + once-per-day guard (`digest_last_sent_date`) live in `app_settings`, + so a missed day rolls into the next digest. Recipients = all active admins. +- **On-demand: `POST /api/admin/digest/send-now`** (admin-only) → `maybe_send_digest(force=True)` + builds the real last-24h digest and sends to the admin set regardless of the policy and + **without** touching the daily cursor (a preview never suppresses the scheduled send). + Surfaced as a "Send Digest Now" button in Settings → Admin, beside "Send Test Digest Email". +- **Decisions (locked):** 6 PM default send · always-send (empty days get a "no activity" + note) · per-user narrative + by-investor structured section · enable/time controlled in the + admin panel. Tests: `backend/test_digest_builder.py` (per-user + per-investor queries, + soft-delete, inbound dedup, two-section compose, fallback, policy resolver, scheduler guards + — stubbed LLM + transport). ## Known gap diff --git a/frontend/index.html b/frontend/index.html index 859b1cd..234faaa 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -7769,6 +7769,10 @@ const [usersLoading, setUsersLoading] = useState(false); const [userActionLoadingId, setUserActionLoadingId] = useState(null); const [testEmailLoading, setTestEmailLoading] = useState(false); + const [sendDigestLoading, setSendDigestLoading] = useState(false); + const [digestPolicy, setDigestPolicy] = useState({ enabled: false, send_hour: 18 }); + const [digestPolicyLoading, setDigestPolicyLoading] = useState(false); + const [digestPolicySaving, setDigestPolicySaving] = useState(false); const [auditLogs, setAuditLogs] = useState([]); const [auditLoading, setAuditLoading] = useState(false); const [automationRules, setAutomationRules] = useState([]); @@ -7861,6 +7865,39 @@ } }, [token, user?.role, onShowToast]); + const fetchDigestPolicy = useCallback(async () => { + if (user?.role !== 'admin') return; + setDigestPolicyLoading(true); + try { + const result = await api('/api/admin/digest/policy', {}, token); + const p = result?.data; + if (p) setDigestPolicy({ enabled: Boolean(p.enabled), send_hour: Number(p.send_hour) || 18 }); + } catch (err) { + onShowToast(getErrorMessage(err, 'Failed to load digest settings'), 'error'); + } finally { + setDigestPolicyLoading(false); + } + }, [token, user?.role, onShowToast]); + + const handleSaveDigestPolicy = async (patch) => { + setDigestPolicy((p) => ({ ...p, ...patch })); // optimistic + setDigestPolicySaving(true); + try { + const result = await api('/api/admin/digest/policy', { + method: 'PATCH', + body: JSON.stringify(patch) + }, token); + const p = result?.data; + if (p) setDigestPolicy({ enabled: Boolean(p.enabled), send_hour: Number(p.send_hour) || 18 }); + onShowToast('Digest settings saved', 'success'); + } catch (err) { + onShowToast(getErrorMessage(err, 'Failed to save digest settings'), 'error'); + fetchDigestPolicy(); // revert to server truth + } finally { + setDigestPolicySaving(false); + } + }; + const fetchAuditLogs = useCallback(async () => { if (user?.role !== 'admin') return; setAuditLoading(true); @@ -7935,11 +7972,12 @@ if (user?.role !== 'admin') return; fetchBackupHistory(); fetchBackupPolicy(); + fetchDigestPolicy(); fetchAuditLogs(); fetchAutomations(); fetchActivityFeed(); fetchSecurityStatus(); - }, [user?.role, fetchBackupHistory, fetchBackupPolicy, fetchAuditLogs, fetchAutomations, fetchActivityFeed, fetchSecurityStatus]); + }, [user?.role, fetchBackupHistory, fetchBackupPolicy, fetchDigestPolicy, fetchAuditLogs, fetchAutomations, fetchActivityFeed, fetchSecurityStatus]); const handleInviteUser = async (e) => { e.preventDefault(); @@ -8233,6 +8271,26 @@ } }; + const handleSendDigestNow = async () => { + setSendDigestLoading(true); + try { + const result = await api('/api/admin/digest/send-now', { + method: 'POST', + body: JSON.stringify({}) + }, token); + const d = result?.data || {}; + const to = (d.recipients || []).join(', '); + const summary = d.has_activity + ? `${d.user_count} member(s), ${d.email_count} email(s), ${d.investor_count} investor(s)` + : 'no activity in the last 24h'; + onShowToast(`Digest sent (${summary})${to ? ` to ${to}` : ''}`, 'success'); + } catch (err) { + onShowToast(getErrorMessage(err, 'Failed to send digest — is a transport (Gmail/DWD or SMTP) configured?'), 'error'); + } finally { + setSendDigestLoading(false); + } + }; + const handleContactsCsvFileUpload = async (event) => { const file = event.target.files && event.target.files[0]; if (!file) return; @@ -8516,11 +8574,42 @@