diff --git a/backend/email_integration/migrations/0002_email_activity.sql b/backend/email_integration/migrations/0002_email_activity.sql new file mode 100644 index 0000000..34bf2b6 --- /dev/null +++ b/backend/email_integration/migrations/0002_email_activity.sql @@ -0,0 +1,26 @@ +-- ============================================================================ +-- email_activity_proposals — the email-activity agent's PROPOSED grid notes, +-- queued for human review. The agent (local model, sovereign) drafts one note per +-- newly-matched email; a partner approves (optionally after editing) or dismisses +-- it in the web UI. Only on approval does the text get appended to the grid. +-- One proposal per email (email_id UNIQUE) — never re-proposed. +-- ============================================================================ +CREATE TABLE IF NOT EXISTS email_activity_proposals ( + id TEXT PRIMARY KEY, + email_id TEXT NOT NULL UNIQUE, + investor_id TEXT, -- fundraising_investors.id / grid row id (best-effort) + investor_name TEXT, + direction TEXT, -- sent | received + summary TEXT, -- the one-line gist from the local model + proposed_note TEXT, -- the full note as drafted (editable before approve) + email_subject TEXT, -- context shown to the reviewer + email_date TEXT, + status TEXT NOT NULL DEFAULT 'pending', -- pending | approved | dismissed + decided_by TEXT, -- users.id who approved/dismissed + decided_at TEXT, + final_note TEXT, -- the text actually appended on approval (may be edited) + created_at TEXT DEFAULT (datetime('now')), + FOREIGN KEY(email_id) REFERENCES emails(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_email_proposals_status ON email_activity_proposals(status); +CREATE INDEX IF NOT EXISTS idx_email_proposals_investor ON email_activity_proposals(investor_id); diff --git a/backend/email_integration/scheduler.py b/backend/email_integration/scheduler.py index 9a35cd1..132a2fd 100644 --- a/backend/email_integration/scheduler.py +++ b/backend/email_integration/scheduler.py @@ -57,7 +57,10 @@ def _conn_factory_from_env() -> Callable[[], sqlite3.Connection]: return get_db -def start_sync_scheduler(conn_factory: Optional[Callable] = None) -> None: +def start_sync_scheduler(conn_factory: Optional[Callable] = None, + post_sync: Optional[Callable] = None) -> None: + """Start the periodic Gmail sync loop. `post_sync`, if given, is called after each + sync pass (best-effort) — used to run the email-activity summarizer.""" if _state["thread"] is not None: return # already running @@ -98,6 +101,11 @@ def start_sync_scheduler(conn_factory: Optional[Callable] = None) -> None: finally: _state["running_now"] = False _state["last_run"] = t0 + if post_sync is not None: + try: + post_sync() + except Exception: + log.exception("post_sync hook failed; continuing") if stop.wait(_cfg.CONFIG.sync_interval_sec): return diff --git a/backend/server.py b/backend/server.py index 3d04070..1eb72c7 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1802,6 +1802,8 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_security_status(user) if path == '/api/system/status': return self.handle_system_status(user) + if path == '/api/activity/proposals': + return self.handle_list_activity_proposals(user) # Users if path == '/api/users': @@ -1905,6 +1907,10 @@ class CRMHandler(BaseHTTPRequestHandler): return self.handle_node_feedback(user, path.split('/')[-2], body) if path == '/api/architect/ground': return self.handle_architect_ground(user, body) + if re.match(r'^/api/activity/proposals/[^/]+/approve$', path): + return self.handle_decide_activity_proposal(user, path.split('/')[-2], 'approve', body) + if re.match(r'^/api/activity/proposals/[^/]+/dismiss$', path): + return self.handle_decide_activity_proposal(user, path.split('/')[-2], 'dismiss', body) if re.match(r'^/api/thesis/nodes/[^/]+/choose$', path): return self.handle_choose_variant(user, path.split('/')[-2]) if re.match(r'^/api/thesis/lines/[^/]+/approve$', path): @@ -3636,6 +3642,29 @@ class CRMHandler(BaseHTTPRequestHandler): conn.close() self.send_json({"data": out}) + def handle_list_activity_proposals(self, user): + if not require_admin(user): + return self.send_error_json("Admin required", 403) + conn = get_db() + try: + return self.send_json({"proposals": list_email_activity_proposals(conn, status="pending")}) + finally: + conn.close() + + def handle_decide_activity_proposal(self, user, proposal_id, decision, body): + if not require_admin(user): + return self.send_error_json("Admin required", 403) + conn = get_db() + try: + res = decide_email_activity_proposal(conn, proposal_id, decision, + user['user_id'], (body or {}).get('note')) + finally: + conn.close() + if res.get("error"): + code = {"not_found": 404, "already_decided": 409}.get(res["error"], 400) + return self.send_error_json(res["error"], code) + return self.send_json({"data": res}) + # ─── UI-triggered index jobs + entity-merge review (Phase 1) ─── def handle_index_job(self, user, kind): if not require_admin(user): @@ -4993,6 +5022,200 @@ def seed_demo_data(): print("Demo data seeded successfully.") +# ─── Email-activity summary agent ──────────────────────────────────────────── +# When a sent/received email is matched to an investor, summarize it to ONE dated, +# marked note on the LOCAL model (sovereign — nothing leaves Ten31, so no redaction +# boundary is needed) and append it to that investor's notes in the fundraising grid. +# Going-forward only: we never summarize email dated before the feature was switched +# on, so the historical backfill does not generate noise. + +_ACTIVITY_SINCE_KEY = "email_activity_since" +_ACTIVITY_MARKER = "✉" # marks an email-derived note (kept editable before approval) + + +def _fmt_activity_date(sent_at): + """ISO-ish timestamp -> 'Jun 25, 2026' (falls back to the raw date part).""" + from datetime import datetime + datepart = str(sent_at or "")[:10] # YYYY-MM-DD + try: + return datetime.strptime(datepart, "%Y-%m-%d").strftime("%b %-d, %Y") + except Exception: + return datepart + + +def _activity_investor(conn, email_id): + """Resolve (grid_row_id, investor_name) for a matched email via its highest- + confidence investor link. Either may be None.""" + link = conn.execute( + "SELECT fundraising_investor_id, organization_id, contact_id FROM email_investor_links " + "WHERE email_id=? ORDER BY match_confidence DESC LIMIT 1", (email_id,)).fetchone() + if not link: + return None, None + inv_id = link["fundraising_investor_id"] + name = None + if inv_id: + r = conn.execute("SELECT investor_name FROM fundraising_investors WHERE id=?", (inv_id,)).fetchone() + name = r["investor_name"] if r else None + if not name and link["organization_id"]: + r = conn.execute("SELECT name FROM organizations WHERE id=?", (link["organization_id"],)).fetchone() + name = r["name"] if r else None + if not name and link["contact_id"]: + r = conn.execute("SELECT first_name, last_name FROM contacts WHERE id=?", (link["contact_id"],)).fetchone() + if r: + name = f"{r['first_name'] or ''} {r['last_name'] or ''}".strip() + return inv_id, name + + +def _summarize_email_gist(subject, body): + """One short clause describing the email's substance, from the LOCAL model.""" + try: + sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest")) + import llm # noqa: E402 + except Exception: + return None + text = (body or "")[:4000] + if not (subject or text).strip(): + return None + out = llm.chat( + f"Subject: {subject or '(none)'}\n\n{text}", + system=("You summarize an email into a brief CRM note. Reply with ONE clause under 14 words " + "describing what the email is about. No greeting, no names, no quotes, no trailing period."), + max_tokens=40, temperature=0.0) + gist = " ".join((out or "").split()).strip().rstrip(".") + return gist or None + + +def _append_grid_note(conn, inv_id, inv_name, note, updated_by=None): + """Append a note to the matched investor's notes cell in the live grid (newest at + bottom), bump the grid version, and refresh the relational projection. Best-effort.""" + row = conn.execute("SELECT grid_json, views_json, version FROM fundraising_state WHERE id='main'").fetchone() + if not row or not row["grid_json"]: + return False + try: + grid = json.loads(row["grid_json"]) + except Exception: + return False + rows = grid.get("rows", []) if isinstance(grid, dict) else [] + target = None + if inv_id: + target = next((r for r in rows if isinstance(r, dict) and str(r.get("id")) == str(inv_id)), None) + if target is None and inv_name: + nn = _normalize_text(inv_name) + target = next((r for r in rows if isinstance(r, dict) + and _normalize_text(str(r.get("investor_name") or "")) == nn), None) + if target is None: + return False + existing = str(target.get("notes") or "").rstrip() + target["notes"] = (existing + "\n" + note) if existing else note + try: + views = json.loads(row["views_json"]) if row["views_json"] else [] + except Exception: + views = [] + # updated_by has a FK to users(id); stamp the approving user (None -> NULL is fine). + conn.execute("UPDATE fundraising_state SET grid_json=?, version=?, updated_by=?, updated_at=? WHERE id='main'", + (json.dumps(grid), (row["version"] or 0) + 1, updated_by, now())) + try: + sync_fundraising_relational(conn, grid, views, actor_user_id=updated_by) + except Exception: + pass + return True + + +def _activity_note_text(sent_at, direction, gist): + return f"{_ACTIVITY_MARKER} {_fmt_activity_date(sent_at)} — {direction}: {gist}" + + +def propose_email_activity_notes(limit=50): + """Draft a PROPOSED grid note per newly-matched email and queue it for human + review (status 'pending'). Does NOT touch the grid — approval does that. Idempotent + (one proposal per email), going-forward only. Safe to call after each Gmail sync.""" + conn = get_db() + try: + try: + conn.execute("SELECT 1 FROM email_activity_proposals LIMIT 1") + except sqlite3.OperationalError: + return {"proposed": 0, "skipped": "tables_absent"} + since = get_app_setting(conn, _ACTIVITY_SINCE_KEY) + if not since: + since = now() + set_app_setting(conn, _ACTIVITY_SINCE_KEY, since) + conn.commit() + try: + rows = conn.execute( + "SELECT id, subject, body_text, snippet, from_email, sent_at FROM emails " + "WHERE is_matched=1 AND sent_at >= ? " + "AND id NOT IN (SELECT email_id FROM email_activity_proposals) " + "ORDER BY sent_at ASC LIMIT ?", (since, limit)).fetchall() + except sqlite3.OperationalError: + return {"proposed": 0, "skipped": "emails_absent"} + if not rows: + return {"proposed": 0} + own = set() + try: + own = {(r[0] or "").lower() for r in conn.execute("SELECT email_address FROM email_accounts")} + except Exception: + pass + done = 0 + for r in rows: + inv_id, inv_name = _activity_investor(conn, r["id"]) + direction = "Sent" if (r["from_email"] or "").lower() in own else "Received" + gist = _summarize_email_gist(r["subject"], r["body_text"] or r["snippet"] or "") + if not gist: + continue # leave unproposed; a later pass retries once the model answers + note = _activity_note_text(r["sent_at"], direction, gist) + conn.execute( + "INSERT OR IGNORE INTO email_activity_proposals " + "(id,email_id,investor_id,investor_name,direction,summary,proposed_note," + " email_subject,email_date,status,created_at) " + "VALUES (?,?,?,?,?,?,?,?,?,'pending',?)", + (generate_id(), r["id"], inv_id, inv_name, direction.lower(), gist, note, + r["subject"], r["sent_at"], now())) + conn.commit() + done += 1 + return {"proposed": done} + finally: + conn.close() + + +def list_email_activity_proposals(conn, status="pending", limit=200): + try: + rows = conn.execute( + "SELECT id, email_id, investor_id, investor_name, direction, summary, proposed_note, " + "email_subject, email_date, status, created_at FROM email_activity_proposals " + "WHERE status=? ORDER BY email_date ASC, created_at ASC LIMIT ?", (status, limit)).fetchall() + return [dict(r) for r in rows] + except sqlite3.OperationalError: + return [] + + +def decide_email_activity_proposal(conn, proposal_id, decision, user_id, edited_note=None): + """Approve (optionally with an edited note -> append to grid) or dismiss a proposal.""" + p = conn.execute("SELECT * FROM email_activity_proposals WHERE id=?", (proposal_id,)).fetchone() + if not p: + return {"error": "not_found"} + if p["status"] != "pending": + return {"error": "already_decided", "status": p["status"]} + if decision == "approve": + note = (edited_note or "").strip() or p["proposed_note"] + placed = _append_grid_note(conn, p["investor_id"], p["investor_name"], note, updated_by=user_id) + conn.execute("UPDATE email_activity_proposals SET status='approved', final_note=?, decided_by=?, decided_at=? WHERE id=?", + (note, user_id, now(), proposal_id)) + action, result = "email.activity_approved", {"status": "approved", "placed_in_grid": placed} + elif decision == "dismiss": + conn.execute("UPDATE email_activity_proposals SET status='dismissed', decided_by=?, decided_at=? WHERE id=?", + (user_id, now(), proposal_id)) + action, result = "email.activity_dismissed", {"status": "dismissed"} + else: + return {"error": "bad_decision"} + conn.execute( + "INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?)", + (generate_id(), now(), "human", user_id, action, "fundraising_investor", p["investor_id"], + json.dumps({"proposal_id": proposal_id}), "crm_ui", now())) + conn.commit() + return result + + # ─── Main Entry Point ──────────────────────────────────────────────────────── def main(): @@ -5010,7 +5233,8 @@ def main(): if os.environ.get("CRM_GMAIL_INTEGRATION_ENABLED", "").lower() in ("1", "true", "yes", "on"): try: from email_integration.scheduler import start_sync_scheduler - start_sync_scheduler() + # After each Gmail sync, draft proposed activity notes for human review. + start_sync_scheduler(post_sync=lambda: propose_email_activity_notes()) print("[email_integration] Gmail sync scheduler started") except Exception as _e: print(f"[email_integration] failed to start scheduler: {_e}") diff --git a/backend/test_email_activity.py b/backend/test_email_activity.py new file mode 100644 index 0000000..56f05a9 --- /dev/null +++ b/backend/test_email_activity.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""Test the email-activity proposal flow: the agent drafts PROPOSED grid notes for +newly-matched emails (going-forward only, idempotent, no grid write), and a human +approves (optionally edited -> appended to the grid) or dismisses them. The local +model is stubbed. Synthetic data only (guardrail #9). +Run: cd backend && python3 test_email_activity.py +""" +import json +import os +import sqlite3 +import sys +import tempfile + +os.environ["CRM_DB_PATH"] = os.path.join(tempfile.mkdtemp(), "crm.db") +os.environ.setdefault("CRM_DATA_DIR", os.path.dirname(os.environ["CRM_DB_PATH"])) +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import server # noqa: E402 + +server._summarize_email_gist = lambda subject, body: "fundraising update; proposed a call" + +FAILS = [] + + +def check(cond, msg): + print((" PASS " if cond else " FAIL ") + msg) + if not cond: + FAILS.append(msg) + + +def setup(): + conn = sqlite3.connect(os.environ["CRM_DB_PATH"]) + conn.row_factory = sqlite3.Row + conn.executescript(""" + CREATE TABLE app_settings (key TEXT PRIMARY KEY, value_json TEXT, updated_at TEXT); + CREATE TABLE email_accounts (id TEXT, email_address TEXT, sync_enabled INT DEFAULT 1, sync_status TEXT, backfill_complete INT); + CREATE TABLE emails (id TEXT PRIMARY KEY, subject TEXT, body_text TEXT, snippet TEXT, from_email TEXT, sent_at TEXT, is_matched INT, match_status TEXT); + CREATE TABLE email_investor_links (id TEXT, email_id TEXT, fundraising_investor_id TEXT, organization_id TEXT, contact_id TEXT, match_confidence REAL); + CREATE TABLE email_activity_proposals (id TEXT PRIMARY KEY, email_id TEXT UNIQUE, investor_id TEXT, investor_name TEXT, + direction TEXT, summary TEXT, proposed_note TEXT, email_subject TEXT, email_date TEXT, status TEXT DEFAULT 'pending', + decided_by TEXT, decided_at TEXT, final_note TEXT, created_at TEXT); + CREATE TABLE users (id TEXT PRIMARY KEY, username TEXT); + CREATE TABLE fundraising_investors (id TEXT PRIMARY KEY, investor_name TEXT, notes TEXT); + CREATE TABLE fundraising_state (id TEXT PRIMARY KEY, grid_json TEXT, views_json TEXT, version INT, + updated_by TEXT REFERENCES users(id), updated_at TEXT); + CREATE TABLE interaction_log (id TEXT PRIMARY KEY, ts TEXT, actor_type TEXT, actor_id TEXT, action TEXT, target_type TEXT, target_id TEXT, payload TEXT, source TEXT, created_at TEXT); + """) + conn.execute("INSERT INTO users (id,username) VALUES ('user-1','grant')") + conn.execute("INSERT INTO app_settings VALUES ('email_activity_since', ?, ?)", (json.dumps("2026-01-01T00:00:00"), "x")) + conn.execute("INSERT INTO email_accounts (id,email_address) VALUES ('a','grant@ten31.xyz')") + conn.execute("INSERT INTO fundraising_investors (id,investor_name,notes) VALUES ('inv1','Harbor & Vine','existing note')") + grid = {"columns": [], "rows": [{"id": "inv1", "investor_name": "Harbor & Vine", "notes": "existing note"}]} + conn.execute("INSERT INTO fundraising_state (id,grid_json,views_json,version) VALUES ('main',?,?,1)", (json.dumps(grid), "[]")) + # e1 sent (from us), e2 received, both after cutoff; e3 before cutoff (excluded) + conn.executemany("INSERT INTO emails (id,subject,body_text,from_email,sent_at,is_matched,match_status) VALUES (?,?,?,?,?,1,'matched')", [ + ("e1", "Fund III", "Here is the update", "grant@ten31.xyz", "2026-06-01T10:00:00"), + ("e2", "Re: Fund III", "Thanks, a question", "lp@harborvine.example", "2026-06-02T10:00:00"), + ("e3", "Old", "ancient", "lp@harborvine.example", "2025-01-01T10:00:00"), + ]) + conn.executemany("INSERT INTO email_investor_links (id,email_id,fundraising_investor_id,match_confidence) VALUES (?,?, 'inv1', 1.0)", + [("l1", "e1"), ("l2", "e2"), ("l3", "e3")]) + conn.commit() + conn.close() + + +def main(): + setup() + + res = server.propose_email_activity_notes() + check(res.get("proposed") == 2, f"proposes 2 (e1,e2 after cutoff; e3 excluded), got {res}") + + res2 = server.propose_email_activity_notes() + check(res2.get("proposed") == 0, f"idempotent: second run proposes 0, got {res2}") + + conn = server.get_db() + props = server.list_email_activity_proposals(conn, status="pending") + check(len(props) == 2, f"2 pending proposals listed, got {len(props)}") + dirs = sorted(p["direction"] for p in props) + check(dirs == ["received", "sent"], f"directions sent+received, got {dirs}") + e1 = next(p for p in props if p["email_id"] == "e1") + check(e1["direction"] == "sent" and "Sent" in e1["proposed_note"], "e1 (from us) is 'sent'") + check("✉" in e1["proposed_note"] and "fundraising update" in e1["proposed_note"], "proposed note marked + has gist") + + # grid must be UNTOUCHED before approval + grid = json.loads(conn.execute("SELECT grid_json FROM fundraising_state WHERE id='main'").fetchone()["grid_json"]) + check(grid["rows"][0]["notes"] == "existing note", "grid notes unchanged before approval") + + # approve e1 (default note) -> appended at bottom + r = server.decide_email_activity_proposal(conn, e1["id"], "approve", "user-1") + check(r.get("status") == "approved" and r.get("placed_in_grid") is True, f"approve places in grid, got {r}") + grid = json.loads(conn.execute("SELECT grid_json FROM fundraising_state WHERE id='main'").fetchone()["grid_json"]) + notes = grid["rows"][0]["notes"] + check(notes.startswith("existing note\n") and "✉" in notes, "approved note appended below existing note") + + # approve e2 with an EDITED note + e2 = next(p for p in props if p["email_id"] == "e2") + r2 = server.decide_email_activity_proposal(conn, e2["id"], "approve", "user-1", edited_note="Custom edited note") + grid = json.loads(conn.execute("SELECT grid_json FROM fundraising_state WHERE id='main'").fetchone()["grid_json"]) + check("Custom edited note" in grid["rows"][0]["notes"], "edited note is what gets appended") + + # re-deciding an already-decided proposal is rejected + r3 = server.decide_email_activity_proposal(conn, e1["id"], "dismiss", "user-1") + check(r3.get("error") == "already_decided", f"cannot re-decide, got {r3}") + + # nothing left pending + check(len(server.list_email_activity_proposals(conn, status="pending")) == 0, "no pending proposals remain") + conn.close() + + if FAILS: + print(f"\nFAILED ({len(FAILS)})") + for f in FAILS: + print(" - " + f) + sys.exit(1) + print("\nALL PASS (email-activity proposal flow)") + + +if __name__ == "__main__": + main() diff --git a/frontend/index.html b/frontend/index.html index 4e0f0d7..8063de1 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -9956,6 +9956,9 @@ const [error, setError] = useState(''); const [busy, setBusy] = useState(''); const [oneEmail, setOneEmail] = useState(() => user?.email || ''); + const [proposals, setProposals] = useState([]); + const [edits, setEdits] = useState({}); + const [deciding, setDeciding] = useState(null); const load = useCallback(async () => { let s; @@ -9972,8 +9975,11 @@ } let a = { accounts: [] }; try { a = await api('/api/email/accounts', {}, token); } catch (_) { /* none / disabled */ } + let pr = { proposals: [] }; + try { pr = await api('/api/activity/proposals', {}, token); } catch (_) { /* admin-only / none */ } setStatus(s); setAccounts(Array.isArray(a?.accounts) ? a.accounts : []); + setProposals(Array.isArray(pr?.proposals) ? pr.proposals : []); }, [token]); useEffect(() => { @@ -10019,6 +10025,23 @@ } }; + const decide = async (p, decision) => { + if (deciding) return; + try { + setDeciding(p.id); + const body = decision === 'approve' + ? JSON.stringify({ note: (edits[p.id] != null ? edits[p.id] : p.proposed_note) }) + : undefined; + await api(`/api/activity/proposals/${p.id}/${decision}`, { method: 'POST', body }, token); + setProposals((prev) => prev.filter((x) => x.id !== p.id)); + onShowToast(decision === 'approve' ? 'Note added to the grid' : 'Proposal dismissed', 'success'); + } catch (err) { + onShowToast(getErrorMessage(err, 'Action failed'), 'error'); + } finally { + setDeciding(null); + } + }; + if (loading) return
; if (error) return
{error}
; if (!status) return
No data
; @@ -10076,6 +10099,39 @@ + {isAdmin && proposals.length > 0 && ( +
+
+ Proposed grid notes + {proposals.length} to review +
+
+ The agent drafted these from matched emails on your local model. Edit if needed, then Approve to append the note to that investor's grid notes, or Dismiss. +
+ {proposals.map((p) => ( +
+
+
{p.investor_name || 'Unknown investor'}
+
{p.direction === 'sent' ? 'Sent' : 'Received'}{p.email_date ? ` · ${formatDate(p.email_date)}` : ''}
+
+ {p.email_subject ?
Subject: {p.email_subject}
: null} +