Adopt the Pipeline: grid-driven opportunities link (v0.1.0:87)

The fundraising grid (canonical) now drives the classic opportunities
Pipeline board, instead of the board being a disconnected second data-entry
surface. An "Add to Pipeline" row action creates a durably-linked opportunity
via the new opportunities.fundraising_investor_id (migration 0005, additive +
reversible), reusing the grid's already-synced contact — retiring the
POST /api/contacts side-door — and mapping the grid lead to the opp owner.

Ownership is split so the two stay reconciled: the grid owns whether the link
exists and the seed; the board owns stage/probability/owner. The link endpoint
is idempotent (one live opp per investor; a re-link never reseeds funnel
fields). "Is in pipeline?"/"what stage?" are derived from a live opp join and
injected as read-only grid columns on read, stripped on write, so they never
persist or dirty the autosave. Remove-from-pipeline soft-deletes the opp and
leaves the grid row fully intact; deleting an investor from the grid archives
its orphaned opp.

Also fixes the standing soft-delete leak in handle_pipeline_report and the
dashboard pipeline aggregates, which counted tombstoned opportunities.

Tests: backend/test_grid_pipeline_link.py (link/idempotent/round-trip/guards/
unlink-intact/re-link/orphan/aggregates); 28/28 suite green, render-smoke green.
This commit is contained in:
Keysat
2026-06-17 23:08:36 -05:00
parent 06482247df
commit 7f9a15ebf3
10 changed files with 724 additions and 89 deletions
+249 -5
View File
@@ -1584,6 +1584,12 @@ def sanitize_fundraising_grid(grid):
if not isinstance(rows, list):
rows = deep_copy_json(DEFAULT_FUNDRAISING_ROWS)
# `pipeline` / `pipeline_stage` are read-only columns whose VALUES are derived from the
# linked opportunity and injected on read — never persisted as row data (the GET handler
# re-injects them after sanitize). The column DEFINITIONS persist like any other column
# so their position / width / hidden state is kept.
_computed_row_values = ('longshot_followup', 'pipeline', 'pipeline_stage')
clean_columns = []
seen = set()
for col in columns:
@@ -1600,11 +1606,83 @@ def sanitize_fundraising_grid(grid):
if not isinstance(row, dict):
continue
next_row = dict(row)
next_row.pop('longshot_followup', None)
for _k in _computed_row_values:
next_row.pop(_k, None)
clean_rows.append(next_row)
return {"columns": clean_columns, "rows": clean_rows}
# ─── Grid ↔ Pipeline link (Adopt the Pipeline) ────────────────────────────────
# The fundraising grid is canonical; the Pipeline board is a view of the deals it
# drives. opportunities.fundraising_investor_id is the durable join. Two ownership
# rules keep them reconciled:
# * Grid owns: whether the link exists, the investor, the primary contact, the seed.
# * Pipeline owns: stage / probability / owner / close date / next step.
# So a grid save NEVER reseeds an existing linked opp (it would clobber funnel state),
# and "is in pipeline?" / "what stage?" are DERIVED from the live opp join — never a
# denormalized flag that could drift.
def _resolve_owner_from_lead(conn, lead_value, fallback_user_id):
"""Map a grid 'lead' cell (a team member's name/initials) to a users.id.
opportunities.owner_id is NOT NULL, so a value is always returned the acting user
is the fallback when no confident match exists. Owner is reassignable on the board,
so a forgiving prefix match is acceptable here."""
lead = str(lead_value or '').strip().lower()
if lead:
row = conn.execute(
"SELECT id FROM users WHERE is_active = 1 AND (lower(full_name) = ? OR lower(username) = ?) LIMIT 1",
(lead, lead)
).fetchone()
if row:
return row['id']
row = conn.execute(
"SELECT id FROM users WHERE is_active = 1 AND (lower(full_name) LIKE ? OR lower(username) LIKE ?) "
"ORDER BY length(full_name) LIMIT 1",
(lead + '%', lead + '%')
).fetchone()
if row:
return row['id']
return fallback_user_id
def reconcile_grid_pipeline_links(conn):
"""After a grid save + relational sync, archive (soft-delete) any pipeline
opportunity whose linked grid investor row no longer exists i.e. the investor was
deleted from the grid. Creation is NEVER done here: a pipeline opp is only created
via the explicit /api/fundraising/pipeline/link endpoint (which carries the seed
fields), so this reconciler is a one-way orphan cleanup that can never spawn an
empty opp or reseed a live one."""
conn.execute(
"""
UPDATE opportunities
SET deleted_at = ?, updated_at = ?
WHERE fundraising_investor_id IS NOT NULL
AND deleted_at IS NULL
AND fundraising_investor_id NOT IN (SELECT id FROM fundraising_investors)
""",
(now(), now())
)
def pipeline_stage_by_source_row(conn):
"""Return {grid source_row_id: current pipeline stage} for every investor with a
live (non-deleted) linked opportunity. The opportunities table is the single source
of truth, so this is always derived fresh and injected as read-only grid columns
never stored in the grid blob, where it could go stale."""
out = {}
for r in conn.execute(
"""
SELECT fi.source_row_id AS srid, o.stage AS stage
FROM opportunities o
JOIN fundraising_investors fi ON o.fundraising_investor_id = fi.id
WHERE o.deleted_at IS NULL
"""
).fetchall():
srid = str(r['srid'] or '')
if srid:
out[srid] = r['stage']
return out
def maybe_run_scheduled_backup():
conn = get_db()
try:
@@ -2066,6 +2144,10 @@ class CRMHandler(BaseHTTPRequestHandler):
return self.handle_create_feature_request(user, body)
if path == '/api/fundraising/log-communication':
return self.handle_log_fundraising_communication(user, body)
if path == '/api/fundraising/pipeline/link':
return self.handle_pipeline_link(user, body)
if path == '/api/fundraising/pipeline/unlink':
return self.handle_pipeline_unlink(user, body)
if path == '/api/fundraising/collab/heartbeat':
return self.handle_fundraising_collab_heartbeat(user, body)
if path == '/api/admin/users':
@@ -3066,6 +3148,152 @@ class CRMHandler(BaseHTTPRequestHandler):
conn.close()
return self.send_json({"data": {"communication": comm, "row": target_row, "version": next_version}}, 201)
def _fetch_opportunity_row(self, conn, opp_id):
return row_to_dict(conn.execute("""
SELECT op.*, c.first_name, c.last_name, c.email as contact_email,
o.name as organization_name, u.full_name as owner_name
FROM opportunities op
LEFT JOIN contacts c ON op.contact_id = c.id
LEFT JOIN organizations o ON op.organization_id = o.id
LEFT JOIN users u ON op.owner_id = u.id
WHERE op.id = ?
""", (opp_id,)).fetchone())
def _resolve_grid_primary_contact(self, conn, investor_id, contact_index, actor_user_id):
"""Resolve the classic contacts.id for a grid investor's chosen contact pill,
reusing the link sync already records in fundraising_contacts.contact_id (no
bare POST /api/contacts side-door). contact_index matches the pill order
(fundraising_contacts.sort_order)."""
try:
idx = int(contact_index)
except (TypeError, ValueError):
idx = 0
if idx < 0:
idx = 0
fc = conn.execute(
"SELECT contact_id, full_name, email FROM fundraising_contacts "
"WHERE investor_id = ? ORDER BY sort_order, rowid LIMIT 1 OFFSET ?",
(investor_id, idx)
).fetchone()
if not fc:
# requested index out of range — fall back to the first pill
fc = conn.execute(
"SELECT contact_id, full_name, email FROM fundraising_contacts "
"WHERE investor_id = ? ORDER BY sort_order, rowid LIMIT 1",
(investor_id,)
).fetchone()
if not fc:
return None
if fc['contact_id']:
return fc['contact_id']
# Rows predating the contact_id backfill: resolve via the same grid→classic
# upsert the relational sync uses, not a fresh side-door create.
inv = conn.execute("SELECT investor_name FROM fundraising_investors WHERE id = ?", (investor_id,)).fetchone()
investor_name = str(inv['investor_name'] if inv else '') or ''
return _upsert_contact_from_fundraising(
conn, investor_name, {"name": fc['full_name'], "email": fc['email']}, actor_user_id=actor_user_id
)
def handle_pipeline_link(self, user, body):
"""Create (or return the existing) Pipeline opportunity for a fundraising-grid
investor row and link it durably via opportunities.fundraising_investor_id.
Idempotent: one live opp per investor a re-link returns the existing opp
without reseeding its Pipeline-owned funnel fields."""
source_row_id = str(body.get('source_row_id') or '').strip()
if not source_row_id:
return self.send_error_json("source_row_id is required")
conn = get_db()
investor = conn.execute(
"SELECT id, investor_name, lead FROM fundraising_investors WHERE source_row_id = ?",
(source_row_id,)
).fetchone()
if not investor:
conn.close()
return self.send_error_json("Investor not found for that grid row — save the grid first", 404)
investor_id = investor['id']
investor_name = str(investor['investor_name'] or '').strip() or 'Untitled Investor'
existing = conn.execute(
"SELECT id FROM opportunities WHERE fundraising_investor_id = ? AND deleted_at IS NULL "
"ORDER BY created_at LIMIT 1",
(investor_id,)
).fetchone()
if existing:
opp = self._fetch_opportunity_row(conn, existing['id'])
conn.close()
return self.send_json({"data": opp, "already_linked": True})
contact_id = self._resolve_grid_primary_contact(
conn, investor_id, body.get('contact_index'), actor_user_id=user['user_id']
)
if not contact_id:
conn.close()
return self.send_error_json("Add at least one contact to the investor row before adding it to the pipeline")
contact = conn.execute("SELECT organization_id FROM contacts WHERE id = ?", (contact_id,)).fetchone()
org_id = contact['organization_id'] if contact else None
stage = str(body.get('stage') or 'lead').strip() or 'lead'
if stage not in PIPELINE_STAGES:
conn.close()
return self.send_error_json(f"Invalid stage. Must be one of: {', '.join(PIPELINE_STAGES)}")
try:
expected_amount = float(body.get('expected_amount') or 0)
except (TypeError, ValueError):
expected_amount = 0.0
try:
probability = int(body.get('probability'))
except (TypeError, ValueError):
probability = 35
probability = max(0, min(100, probability))
fund_name = str(body.get('fund_name') or '').strip()
name = str(body.get('name') or '').strip() or f"{investor_name} — Pipeline"
owner_id = _resolve_owner_from_lead(conn, investor['lead'], user['user_id'])
opp_id = generate_id()
conn.execute("""
INSERT INTO opportunities (id, name, contact_id, organization_id, stage,
commitment_amount, expected_amount, probability, fund_name,
owner_id, priority, fundraising_investor_id)
VALUES (?, ?, ?, ?, ?, 0, ?, ?, ?, ?, 'medium', ?)
""", (opp_id, name, contact_id, org_id, stage, expected_amount, probability,
fund_name or None, owner_id, investor_id))
log_audit(conn, user['user_id'], 'opportunity', opp_id, 'create', {"source": "grid_pipeline_link"})
conn.commit()
opp = self._fetch_opportunity_row(conn, opp_id)
conn.close()
return self.send_json({"data": opp, "already_linked": False}, 201)
def handle_pipeline_unlink(self, user, body):
"""Remove a grid investor from the Pipeline: soft-delete its linked opportunity.
The fundraising-grid row (investor, contacts, commitments, notes) is left fully
intact only the opportunity is archived (recoverable via the board)."""
source_row_id = str(body.get('source_row_id') or '').strip()
if not source_row_id:
return self.send_error_json("source_row_id is required")
conn = get_db()
investor = conn.execute(
"SELECT id FROM fundraising_investors WHERE source_row_id = ?", (source_row_id,)
).fetchone()
if not investor:
conn.close()
return self.send_error_json("Investor not found for that grid row", 404)
opp_rows = conn.execute(
"SELECT id FROM opportunities WHERE fundraising_investor_id = ? AND deleted_at IS NULL",
(investor['id'],)
).fetchall()
archived = 0
for opp in opp_rows:
conn.execute("UPDATE opportunities SET deleted_at = ?, updated_at = ? WHERE id = ?",
(now(), now(), opp['id']))
log_audit(conn, user['user_id'], 'opportunity', opp['id'], 'delete', {"source": "grid_pipeline_unlink"})
archived += 1
conn.commit()
conn.close()
return self.send_json({"data": {"archived": archived}})
def handle_intake_match(self, user, params):
"""Read-only: does an investor matching this intake already exist? Used by the
Matrix intake bot to label its in-thread proposal new-vs-existing. Returns the
@@ -3154,11 +3382,11 @@ class CRMHandler(BaseHTTPRequestHandler):
).fetchone()['total']
pipeline_value = conn.execute(
"SELECT COALESCE(SUM(expected_amount), 0) as total FROM opportunities WHERE stage NOT IN ('funded', 'lost')"
"SELECT COALESCE(SUM(expected_amount), 0) as total FROM opportunities WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL"
).fetchone()['total']
active_opportunities = conn.execute(
"SELECT COUNT(*) as c FROM opportunities WHERE stage NOT IN ('funded', 'lost')"
"SELECT COUNT(*) as c FROM opportunities WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL"
).fetchone()['c']
# Pipeline by stage
@@ -3166,7 +3394,7 @@ class CRMHandler(BaseHTTPRequestHandler):
SELECT stage, COUNT(*) as count, COALESCE(SUM(expected_amount), 0) as total_value,
COALESCE(SUM(commitment_amount), 0) as committed_value
FROM opportunities
WHERE stage != 'lost'
WHERE stage != 'lost' AND deleted_at IS NULL
GROUP BY stage
ORDER BY CASE stage
WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3
@@ -3243,6 +3471,7 @@ class CRMHandler(BaseHTTPRequestHandler):
COALESCE(SUM(commitment_amount), 0) as total_committed,
COALESCE(AVG(probability), 0) as avg_probability
FROM opportunities
WHERE deleted_at IS NULL
GROUP BY stage
ORDER BY CASE stage
WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3
@@ -3255,6 +3484,7 @@ class CRMHandler(BaseHTTPRequestHandler):
COALESCE(SUM(op.expected_amount), 0) as total_expected
FROM opportunities op
LEFT JOIN users u ON op.owner_id = u.id
WHERE op.deleted_at IS NULL
GROUP BY op.owner_id, op.stage
ORDER BY u.full_name, op.stage
""").fetchall())
@@ -3263,7 +3493,7 @@ class CRMHandler(BaseHTTPRequestHandler):
SELECT priority, COUNT(*) as count,
COALESCE(SUM(expected_amount), 0) as total_expected
FROM opportunities
WHERE stage NOT IN ('funded', 'lost')
WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL
GROUP BY priority
""").fetchall())
@@ -4824,6 +5054,7 @@ class CRMHandler(BaseHTTPRequestHandler):
conn = get_db()
self._ensure_fundraising_state_row(conn)
row = conn.execute("SELECT * FROM fundraising_state WHERE id = 'main'").fetchone()
stage_by_row = pipeline_stage_by_source_row(conn)
conn.close()
try:
@@ -4842,6 +5073,16 @@ class CRMHandler(BaseHTTPRequestHandler):
columns = grid.get('columns', [])
rows = grid.get('rows', [])
# Inject the read-only pipeline columns, derived from the live linked opportunity
# (the opportunities table is canonical — never stored in the grid blob, so it
# can't go stale). The frontend renders these read-only and strips them on save.
for r in rows:
if not isinstance(r, dict):
continue
stage = stage_by_row.get(str(r.get('id') or ''))
r['pipeline'] = bool(stage)
r['pipeline_stage'] = stage or ''
return self.send_json({
"data": {
"grid": {"columns": columns, "rows": rows},
@@ -5020,6 +5261,9 @@ class CRMHandler(BaseHTTPRequestHandler):
WHERE id = 'main'
""", (json.dumps(grid), json.dumps(next_views), next_version, user['user_id'], now()))
sync_fundraising_relational(conn, grid, next_views, actor_user_id=user['user_id'])
# Archive pipeline opps orphaned by an investor deleted from the grid (one-way
# cleanup; never creates or reseeds — see reconcile_grid_pipeline_links).
reconcile_grid_pipeline_links(conn)
log_audit(conn, user['user_id'], 'fundraising_state', 'main', 'update', {"version": next_version})
conn.commit()
conn.close()