Pipeline funnel v2: 4-stage enum + migration 0007 + derived grid signals

Collapse the inherited 6-stage opportunity funnel to the locked 4-stage
per-investor funnel (lead -> engaged -> diligence -> commitment), terminal at
commitment. Migration 0007 remaps existing stage values (outreach/meeting ->
engaged, due_diligence -> diligence, committed/funded -> commitment) and
archives the stray 'lost' value (the grid row is left intact). Inject read-only
existing_investor (total_invested>0), last_activity_at, and staleness
(''/'aging'>=30d/'stale'>=60d) into the grid GET, stripped on write. Frontend:
4-stage chip tints + Pipeline board / opp-form / mock on the new enum.

The visible desktop existing-investor star + staleness recency column + the
Stale saved view are deferred to mobile Phase 3 (data is injected + test-locked
now, so that phase stays pure-frontend). Longshot was already retired by prior
cleanup -- no-op.

Tests: test_pipeline_stages_v2.py (migration remap + derivation boundaries) +
updated grid-pipeline-link / soft-delete / nl_query; 36/36 green, render-smoke
green, fresh-DB migrate clean.
This commit is contained in:
Keysat
2026-06-19 12:54:12 -05:00
parent fe62df1a14
commit e46dd36517
12 changed files with 420 additions and 95 deletions
@@ -0,0 +1,14 @@
-- Reversal of 0007_pipeline_stages_v2.sql (manual; .down files are never auto-applied).
--
-- BEST-EFFORT: the 6->4 stage collapse is lossy and cannot be perfectly inverted (the
-- pattern other .down files here share -- e.g. 0005 cannot DROP COLUMN on old SQLite). It
-- restores VALID legacy 6-stage values, choosing a representative for each collapsed pair:
-- engaged was outreach OR meeting -> 'meeting' (representative)
-- diligence -> 'due_diligence' (exact)
-- commitment was committed OR funded -> 'committed' (representative)
-- Opportunities archived from the stray 'lost' value still carry stage = 'lost' but cannot be
-- re-identified as "archived by this migration" vs archived for other reasons, so they are
-- left archived; un-archive (clear deleted_at) manually if a rollback truly needs them back.
UPDATE opportunities SET stage = 'meeting' WHERE stage = 'engaged';
UPDATE opportunities SET stage = 'due_diligence' WHERE stage = 'diligence';
UPDATE opportunities SET stage = 'committed' WHERE stage = 'commitment';
@@ -0,0 +1,25 @@
-- Pipeline funnel v2 — collapse the inherited 6-stage opportunity funnel into the locked
-- 4-stage per-investor funnel: lead -> engaged -> diligence -> commitment, terminal at
-- commitment. See ROADMAP "Pipeline stages + investor flags/labels -- LOCKED SPEC" (2026-06-19)
-- and server.PIPELINE_STAGES.
--
-- DATA-ONLY + DEPLOYMENT-STATE-INVARIANT (migrations guide): targets stage values
-- structurally, so it is a no-op on a fresh DB (no opportunities) and remaps deterministically
-- on a populated one.
-- outreach, meeting -> engaged (a two-way conversation has begun; "meeting" was an
-- activity, not a position, so it folds in here)
-- due_diligence -> diligence
-- committed, funded -> commitment (terminal; post-commit $ lives in the grid fund cell,
-- and fund admin owns post-commitment -- no "funded" stage)
UPDATE opportunities SET stage = 'engaged' WHERE stage IN ('outreach', 'meeting');
UPDATE opportunities SET stage = 'diligence' WHERE stage = 'due_diligence';
UPDATE opportunities SET stage = 'commitment' WHERE stage IN ('committed', 'funded');
-- The stray legacy 'lost' value is not in the new settable enum, and a lost deal is a dead
-- deal: ARCHIVE (soft-delete) the opportunity rather than leave an un-settable stage on a live
-- row. The grid investor row is left fully intact (the grid is canonical); graveyarding the
-- investor stays a human action, never an auto-mutation (human-in-the-loop guardrail). The
-- stage text is left as 'lost' on the archived row for provenance -- it is filtered out
-- everywhere by deleted_at IS NULL.
UPDATE opportunities SET deleted_at = datetime('now'), updated_at = datetime('now')
WHERE stage = 'lost' AND deleted_at IS NULL;
+9 -9
View File
@@ -28,13 +28,13 @@ from datetime import datetime, timedelta
# scan flooding a response. A list intent past this is reported truncated, never silently cut.
MAX_ROWS = 500
# Live, non-terminal pipeline stages in funnel order (mirrors server.PIPELINE_STAGES; 'lost'
# is the terminal drop). Kept here so the pipeline intents have a stable rank without importing
# the server module (helpers take a conn; they never import server — house convention).
_STAGE_ORDER = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded']
# 4-stage per-investor funnel in order, terminal at 'commitment' (mirrors server.PIPELINE_STAGES).
# Kept here so the pipeline intents have a stable rank without importing the server module
# (helpers take a conn; they never import server — house convention).
_STAGE_ORDER = ['lead', 'engaged', 'diligence', 'commitment']
_STAGE_RANK_SQL = (
"CASE stage WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3 "
"WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6 ELSE 0 END")
"CASE stage WHEN 'lead' THEN 1 WHEN 'engaged' THEN 2 "
"WHEN 'diligence' THEN 3 WHEN 'commitment' THEN 4 ELSE 0 END")
# ── helpers ────────────────────────────────────────────────────────────────────────────
@@ -237,7 +237,7 @@ def run_pipeline_top(conn, slots):
"o.probability, u.full_name AS owner FROM opportunities o "
"LEFT JOIN fundraising_investors i ON i.id = o.fundraising_investor_id "
"LEFT JOIN users u ON u.id = o.owner_id "
"WHERE o.deleted_at IS NULL AND o.stage != 'lost' "
"WHERE o.deleted_at IS NULL "
f"ORDER BY {_STAGE_RANK_SQL} DESC, o.expected_amount DESC LIMIT ?", (n,)))
for r in rows:
r["last_activity_at"] = last.get(r.pop("inv_id"))
@@ -248,11 +248,11 @@ def run_pipeline_top(conn, slots):
def run_pipeline_totals(conn, slots):
"""Total pipeline dollars and the split across each stage (excludes lost)."""
"""Total pipeline dollars and the split across each stage."""
rows = _rows(conn.execute(
"SELECT stage, COUNT(*) AS count, COALESCE(SUM(expected_amount),0) AS expected_total, "
"COALESCE(SUM(commitment_amount),0) AS committed_total FROM opportunities "
f"WHERE deleted_at IS NULL AND stage != 'lost' GROUP BY stage ORDER BY {_STAGE_RANK_SQL}"))
f"WHERE deleted_at IS NULL GROUP BY stage ORDER BY {_STAGE_RANK_SQL}"))
total = sum(r["expected_total"] for r in rows)
count = sum(r["count"] for r in rows)
return {"columns": ["stage", "count", "expected_total", "committed_total"],
+8 -7
View File
@@ -129,16 +129,17 @@ def seed(conn):
rem("r_del", "i_acme", "Tombstoned", (TODAY - timedelta(days=2)).isoformat(), deleted=True)
rem("r_standalone", None, "Team chore", (TODAY - timedelta(days=1)).isoformat())
# opportunities — committed / meeting (live) / lost (terminal) / deleted
# opportunities — commitment / engaged (live) / two archived (the new model has no 'lost'
# stage: a dead deal is soft-deleted, so both excluded cases ride deleted_at)
def opp(oid, inv_id, contact, stage, expected, owner, deleted=False):
c("INSERT INTO opportunities (id, name, contact_id, stage, expected_amount, owner_id, "
"fundraising_investor_id, deleted_at) VALUES (?,?,?,?,?,?,?,?)",
(oid, oid, contact, stage, expected, owner, inv_id, _ago(0) if deleted else None))
# opp contact_id must reference a real contacts row (FK on); reuse the two we made
opp("o_acme", "i_acme", "cc_alice", "committed", 4_000_000, "u_jon")
opp("o_beta", "i_beta", "cc_dana", "meeting", 1_000_000, "u_grant")
opp("o_lost", "i_acme", "cc_alice", "lost", 9_000_000, "u_jon")
opp("o_del", "i_beta", "cc_dana", "due_diligence", 7_000_000, "u_grant", deleted=True)
opp("o_acme", "i_acme", "cc_alice", "commitment", 4_000_000, "u_jon")
opp("o_beta", "i_beta", "cc_dana", "engaged", 1_000_000, "u_grant")
opp("o_lost", "i_acme", "cc_alice", "diligence", 9_000_000, "u_jon", deleted=True)
opp("o_del", "i_beta", "cc_dana", "diligence", 7_000_000, "u_grant", deleted=True)
conn.commit()
@@ -182,8 +183,8 @@ def main():
print("pipeline")
r = run("pipeline_totals")
stages = {row["stage"]: row for row in r["rows"]}
check(set(stages) == {"committed", "meeting"}, f"pipeline_totals excludes lost+deleted: {set(stages)}")
check(stages["committed"]["expected_total"] == 4_000_000, "pipeline_totals stage sum")
check(set(stages) == {"commitment", "engaged"}, f"pipeline_totals excludes archived/deleted: {set(stages)}")
check(stages["commitment"]["expected_total"] == 4_000_000, "pipeline_totals stage sum")
r = run("pipeline_top", {"limit": 10})
check(names(r) == ["Acme Capital", "Beta Partners"], "pipeline_top furthest-stage first")
check(r["rows"][0]["last_activity_at"] is not None, "pipeline_top enriches last activity")
+2 -2
View File
@@ -193,8 +193,8 @@ def main():
"expected_amount, probability, fund_name, description, next_step, owner_id, priority, updated_at) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(gen(), f"{org_name or person}{fund_label}", cid, org_ids.get(org_name) if org_name else None,
stage, amt if stage in ("committed", "funded") else 0, amt,
{"lead": 10, "outreach": 25, "meeting": 40, "due_diligence": 60, "committed": 90, "funded": 100}[stage],
stage, amt if stage == "commitment" else 0, amt,
{"lead": 10, "engaged": 35, "diligence": 60, "commitment": 90}[stage],
fund_label, f"Potential {fund_label} allocation for {person}.",
random.choice(["Send deck", "Schedule call", "Await IC", "Send subdocs"]),
uid, random.choice(["low", "medium", "high"]), now()))
+85 -16
View File
@@ -1596,7 +1596,8 @@ def sanitize_fundraising_grid(grid):
# linked opportunity and injected on read — never persisted as row data (the GET handler
# re-injects them after sanitize). The column DEFINITIONS persist like any other column
# so their position / width / hidden state is kept.
_computed_row_values = ('longshot_followup', 'pipeline', 'pipeline_stage', 'reminder_status')
_computed_row_values = ('longshot_followup', 'pipeline', 'pipeline_stage', 'reminder_status',
'existing_investor', 'last_activity_at', 'staleness')
clean_columns = []
seen = set()
@@ -1794,6 +1795,59 @@ def reminder_status_by_source_row(conn):
out[srid] = st
return out
# Staleness ramp — one global threshold set (locked spec 2026-06-19): the last-contact recency
# value colors fresh (grey) -> aging (amber) >= STALE_AGING_DAYS -> stale (red) >= STALE_DAYS.
# Not stage-aware for v1. The same `staleness`/`last_activity_at` the grid injects also drives the
# mobile card and (a user-built) "Stale" view, so everything color-codes off one server signal.
STALE_AGING_DAYS = 30
STALE_DAYS = 60
def existing_investor_by_source_row(conn):
"""Return the set of grid source_row_ids whose investor has any committed capital
(fundraising_investors.total_invested > 0) the auto-derived "Existing Investor" flag
(locked spec 2026-06-19). Injected read-only on grid read like pipeline_stage; never a
maintained column. Orthogonal to stage: a re-solicited LP shows the star AND a live stage."""
out = set()
for r in conn.execute(
"SELECT source_row_id FROM fundraising_investors WHERE total_invested > 0"
).fetchall():
srid = str(r['source_row_id'] or '')
if srid:
out.add(srid)
return out
def staleness_by_source_row(conn):
"""Return {grid source_row_id: (last_activity_iso_or_None, staleness)} where staleness is
'' (fresh or no recorded activity), 'aging' (>= STALE_AGING_DAYS since last contact), or
'stale' (>= STALE_DAYS). Derived from the SAME last_activity_by_investor signal the reminders
surface uses, so desktop grid + mobile card color-code identically. A row with no recorded
activity gets '' (no false "stale" on a brand-new lead); the W1b nurture-gap nudge handles
in-pipeline-with-no-activity separately."""
last_by_inv = last_activity_by_investor(conn)
out = {}
today = datetime.utcnow().date()
for r in conn.execute("SELECT id, source_row_id FROM fundraising_investors").fetchall():
srid = str(r['source_row_id'] or '')
if not srid:
continue
ts = last_by_inv.get(r['id'])
level = ''
if ts:
try:
d = datetime.strptime(str(ts)[:10], '%Y-%m-%d').date()
age = (today - d).days
if age >= STALE_DAYS:
level = 'stale'
elif age >= STALE_AGING_DAYS:
level = 'aging'
except ValueError:
pass
out[srid] = (ts, level)
return out
def maybe_run_scheduled_backup():
conn = get_db()
try:
@@ -1830,7 +1884,11 @@ def start_backup_scheduler():
# ─── Request Handler ──────────────────────────────────────────────────────────
PIPELINE_STAGES = ['lead', 'outreach', 'meeting', 'due_diligence', 'committed', 'funded']
# 4-stage per-investor funnel, terminal at 'commitment' (locked spec 2026-06-19). On commit the
# deal is handed to fund admin + the $ recorded in the grid fund cell — there is no 'funded'/'lost'
# stage (the grid's committed $ and the graveyard flag carry those). Migration 0007 remapped the
# legacy 6-stage values. Keep the frontend kanban + opp-form + nl_query rank in sync with this list.
PIPELINE_STAGES = ['lead', 'engaged', 'diligence', 'commitment']
CONTACT_TYPES = ['investor', 'prospect', 'advisor', 'other']
COMM_TYPES = ['email', 'call', 'meeting', 'note', 'text']
@@ -2718,7 +2776,7 @@ class CRMHandler(BaseHTTPRequestHandler):
query = """
SELECT o.*,
(SELECT COUNT(*) FROM contacts WHERE organization_id = o.id AND deleted_at IS NULL) as contact_count,
(SELECT COALESCE(SUM(commitment_amount), 0) FROM opportunities WHERE organization_id = o.id AND stage = 'funded' AND deleted_at IS NULL) as total_funded
(SELECT COALESCE(SUM(commitment_amount), 0) FROM opportunities WHERE organization_id = o.id AND stage = 'commitment' AND deleted_at IS NULL) as total_funded
FROM organizations o WHERE 1=1 AND o.deleted_at IS NULL
"""
args = []
@@ -3763,11 +3821,11 @@ class CRMHandler(BaseHTTPRequestHandler):
).fetchone()['total']
pipeline_value = conn.execute(
"SELECT COALESCE(SUM(expected_amount), 0) as total FROM opportunities WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL"
"SELECT COALESCE(SUM(expected_amount), 0) as total FROM opportunities WHERE stage != 'commitment' AND deleted_at IS NULL"
).fetchone()['total']
active_opportunities = conn.execute(
"SELECT COUNT(*) as c FROM opportunities WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL"
"SELECT COUNT(*) as c FROM opportunities WHERE stage != 'commitment' AND deleted_at IS NULL"
).fetchone()['c']
# Pipeline by stage
@@ -3775,11 +3833,11 @@ class CRMHandler(BaseHTTPRequestHandler):
SELECT stage, COUNT(*) as count, COALESCE(SUM(expected_amount), 0) as total_value,
COALESCE(SUM(commitment_amount), 0) as committed_value
FROM opportunities
WHERE stage != 'lost' AND deleted_at IS NULL
WHERE deleted_at IS NULL
GROUP BY stage
ORDER BY CASE stage
WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3
WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6
WHEN 'lead' THEN 1 WHEN 'engaged' THEN 2
WHEN 'diligence' THEN 3 WHEN 'commitment' THEN 4
END
""").fetchall())
@@ -3855,8 +3913,8 @@ class CRMHandler(BaseHTTPRequestHandler):
WHERE deleted_at IS NULL
GROUP BY stage
ORDER BY CASE stage
WHEN 'lead' THEN 1 WHEN 'outreach' THEN 2 WHEN 'meeting' THEN 3
WHEN 'due_diligence' THEN 4 WHEN 'committed' THEN 5 WHEN 'funded' THEN 6
WHEN 'lead' THEN 1 WHEN 'engaged' THEN 2
WHEN 'diligence' THEN 3 WHEN 'commitment' THEN 4
END
""").fetchall())
@@ -3874,7 +3932,7 @@ class CRMHandler(BaseHTTPRequestHandler):
SELECT priority, COUNT(*) as count,
COALESCE(SUM(expected_amount), 0) as total_expected
FROM opportunities
WHERE stage NOT IN ('funded', 'lost') AND deleted_at IS NULL
WHERE stage != 'commitment' AND deleted_at IS NULL
GROUP BY priority
""").fetchall())
@@ -5492,6 +5550,8 @@ class CRMHandler(BaseHTTPRequestHandler):
row = conn.execute("SELECT * FROM fundraising_state WHERE id = 'main'").fetchone()
stage_by_row = pipeline_stage_by_source_row(conn)
reminder_by_row = reminder_status_by_source_row(conn)
existing_by_row = existing_investor_by_source_row(conn)
recency_by_row = staleness_by_source_row(conn)
conn.close()
try:
@@ -5523,6 +5583,15 @@ class CRMHandler(BaseHTTPRequestHandler):
# in the blob). '' = no open reminder; a saved view can filter on this column to
# supersede the binary follow_up checkbox.
r['reminder_status'] = reminder_by_row.get(str(r.get('id') or ''), '')
# Auto-derived "Existing Investor" flag (total_invested > 0) + last-contact recency
# and its staleness ramp ('' / 'aging' / 'stale'). All read-only, computed fresh on
# read like the columns above (stripped on write), so the desktop grid and the mobile
# card render the star + the grey/amber/red recency off one server signal.
srid = str(r.get('id') or '')
r['existing_investor'] = srid in existing_by_row
last_activity, staleness = recency_by_row.get(srid, (None, ''))
r['last_activity_at'] = last_activity
r['staleness'] = staleness
return self.send_json({
"data": {
@@ -5955,12 +6024,12 @@ def seed_demo_data():
# Create opportunities
opp_data = [
(contacts[6][0], orgs[6][0], "Cascade Wealth - Fund II", "meeting", 10000000, 10000000, 40, user2_id),
(contacts[7][0], orgs[7][0], "Blue Harbor - Fund II", "due_diligence", 5000000, 5000000, 60, user2_id),
(contacts[8][0], None, "William Johnson - Direct", "outreach", 0, 2000000, 20, admin_id),
(contacts[6][0], orgs[6][0], "Cascade Wealth - Fund II", "engaged", 10000000, 10000000, 40, user2_id),
(contacts[7][0], orgs[7][0], "Blue Harbor - Fund II", "diligence", 5000000, 5000000, 60, user2_id),
(contacts[8][0], None, "William Johnson - Direct", "lead", 0, 2000000, 20, admin_id),
(contacts[9][0], None, "Garcia Family Office - Fund II", "lead", 0, 15000000, 10, admin_id),
(contacts[10][0], None, "Thomas Brown - WM Referral", "meeting", 0, 3000000, 30, user2_id),
(contacts[11][0], None, "Linda Wilson - PM Intro", "outreach", 0, 5000000, 15, admin_id),
(contacts[10][0], None, "Thomas Brown - WM Referral", "engaged", 0, 3000000, 30, user2_id),
(contacts[11][0], None, "Linda Wilson - PM Intro", "lead", 0, 5000000, 15, admin_id),
]
for opp in opp_data:
conn.execute("""
+18 -11
View File
@@ -121,11 +121,11 @@ def main():
print("\n[link: creates one linked opportunity with seeds]")
st, d = _req(port, "POST", "/api/fundraising/pipeline/link", token, {
"source_row_id": "rowAcme", "fund_name": "Fund III",
"expected_amount": 250000, "probability": 40, "stage": "outreach",
"expected_amount": 250000, "probability": 40, "stage": "engaged",
})
opp = (d or {}).get("data") or {}
check(st == 201 and (d or {}).get("already_linked") is False, f"link -> 201 new (got {st}, {d})")
check(opp.get("stage") == "outreach" and opp.get("expected_amount") == 250000
check(opp.get("stage") == "engaged" and opp.get("expected_amount") == 250000
and opp.get("probability") == 40 and opp.get("fund_name") == "Fund III",
f"seeds applied (got {{stage:{opp.get('stage')}, amt:{opp.get('expected_amount')}, "
f"prob:{opp.get('probability')}, fund:{opp.get('fund_name')}}})")
@@ -138,14 +138,14 @@ def main():
# ── idempotent re-link: returns existing, board-owned stage NOT reseeded ──
print("\n[idempotent: re-link returns existing opp without reseeding funnel fields]")
st, _ = _req(port, "PATCH", f"/api/opportunities/{opp_id}/stage", token, {"stage": "meeting"})
check(st == 200, f"advance stage on the board -> meeting (got {st})")
st, _ = _req(port, "PATCH", f"/api/opportunities/{opp_id}/stage", token, {"stage": "diligence"})
check(st == 200, f"advance stage on the board -> diligence (got {st})")
st, d = _req(port, "POST", "/api/fundraising/pipeline/link", token, {
"source_row_id": "rowAcme", "stage": "lead", "expected_amount": 999, "probability": 5,
})
opp2 = (d or {}).get("data") or {}
check(st == 200 and (d or {}).get("already_linked") is True, f"re-link -> already_linked (got {st}, {d})")
check(opp2.get("stage") == "meeting" and opp2.get("expected_amount") == 250000,
check(opp2.get("stage") == "diligence" and opp2.get("expected_amount") == 250000,
f"funnel fields preserved, not reseeded (got stage={opp2.get('stage')}, amt={opp2.get('expected_amount')})")
check(_opp_count_live(fr_id) == 1, "still exactly one live opp (no duplicate)")
@@ -154,9 +154,13 @@ def main():
st, d = _req(port, "GET", "/api/fundraising/state", token)
rows = {r["id"]: r for r in (d or {}).get("data", {}).get("grid", {}).get("rows", [])}
check(rows.get("rowAcme", {}).get("pipeline") is True
and rows.get("rowAcme", {}).get("pipeline_stage") == "meeting",
f"rowAcme pipeline true @meeting (got {rows.get('rowAcme', {}).get('pipeline')}, "
and rows.get("rowAcme", {}).get("pipeline_stage") == "diligence",
f"rowAcme pipeline true @diligence (got {rows.get('rowAcme', {}).get('pipeline')}, "
f"{rows.get('rowAcme', {}).get('pipeline_stage')})")
# Phase 0 derived signals are injected read-only on every row (values depend on seed;
# assert the keys are present so the strip/inject round-trip below is meaningful).
check(all(k in rows.get("rowAcme", {}) for k in ("existing_investor", "staleness", "last_activity_at")),
f"rowAcme carries derived existing_investor/staleness/last_activity (keys: {sorted(rows.get('rowAcme', {}).keys())})")
check(rows.get("rowBeta", {}).get("pipeline") is False
and rows.get("rowBeta", {}).get("pipeline_stage") == "",
f"rowBeta not in pipeline (got {rows.get('rowBeta', {}).get('pipeline')})")
@@ -172,12 +176,15 @@ def main():
blob = json.loads(c.execute("SELECT grid_json FROM fundraising_state WHERE id='main'").fetchone()[0])
c.close()
stored_acme = {r["id"]: r for r in blob.get("rows", [])}.get("rowAcme", {})
check("pipeline" not in stored_acme and "pipeline_stage" not in stored_acme,
"computed keys are NOT persisted into the grid blob")
check(not any(k in stored_acme for k in ("pipeline", "pipeline_stage", "existing_investor",
"staleness", "last_activity_at")),
"computed keys (pipeline + existing_investor/staleness/last_activity) NOT persisted into the grid blob")
st, d = _req(port, "GET", "/api/fundraising/state", token)
rt = {r["id"]: r for r in (d or {}).get("data", {}).get("grid", {}).get("rows", [])}.get("rowAcme", {})
check(rt.get("pipeline") is True and rt.get("pipeline_stage") == "meeting",
check(rt.get("pipeline") is True and rt.get("pipeline_stage") == "diligence",
f"pipeline values re-injected after round-trip (got {rt.get('pipeline')}, {rt.get('pipeline_stage')})")
check(all(k in rt for k in ("existing_investor", "staleness", "last_activity_at")),
"derived signals re-injected after round-trip")
# ── guards ──
print("\n[guard: a contactless row cannot be added to the pipeline]")
@@ -234,7 +241,7 @@ def main():
# ── re-link after unlink: a fresh opp is created (the archived one stays archived) ──
print("\n[re-link after unlink: creates a new opp, flag reappears]")
st, d = _req(port, "POST", "/api/fundraising/pipeline/link", token, {
"source_row_id": "rowAcme", "stage": "outreach", "expected_amount": 50000,
"source_row_id": "rowAcme", "stage": "engaged", "expected_amount": 50000,
})
relinked = (d or {}).get("data") or {}
check(st == 201 and (d or {}).get("already_linked") is False and relinked.get("id") != opp_id,
+145
View File
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""Regression test for Phase 0 — the 4-stage pipeline funnel + the derived grid signals
(ROADMAP "Pipeline stages + investor flags/labels -- LOCKED SPEC", 2026-06-19).
Covers the parts the round-trip test in test_grid_pipeline_link.py only checks structurally:
* migration 0007 stage remap — outreach/meeting -> engaged, due_diligence -> diligence,
committed/funded -> commitment, and the stray 'lost' value archived (soft-deleted), with
'lead' left untouched;
* existing_investor_by_source_row — total_invested > 0 is the auto-derived "Existing Investor";
* staleness_by_source_row — last-contact age maps to '' / 'aging' (>= 30d) / 'stale' (>= 60d),
boundaries inclusive, and NO recorded activity -> '' (no false "stale" on a fresh lead).
Synthetic only (guardrail #9). Run: cd backend && python3 test_pipeline_stages_v2.py
"""
import os
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
_DATA = tempfile.mkdtemp()
os.environ["CRM_DATA_DIR"] = _DATA
os.environ["CRM_DB_PATH"] = os.path.join(_DATA, "crm.db")
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import server # noqa: E402
FAILS = []
_HERE = os.path.dirname(os.path.abspath(__file__))
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def _days_ago(n):
return (datetime.utcnow() - timedelta(days=n)).strftime("%Y-%m-%d")
def test_migration_remap(conn):
print("\n[migration 0007: stage remap + lost archived]")
c = conn.cursor()
olds = [("m_out", "outreach"), ("m_meet", "meeting"), ("m_dd", "due_diligence"),
("m_comm", "committed"), ("m_fund", "funded"), ("m_lost", "lost"),
("m_lead", "lead")]
for oid, stage in olds:
c.execute("INSERT INTO opportunities (id, name, stage, contact_id, owner_id) VALUES (?,?,?,?,?)",
(oid, oid, stage, "c0", "u1"))
conn.commit()
# init_db already applied 0007; re-run its SQL against the rows we just inserted with
# legacy values (the UPDATEs target by old stage value, so this is exactly the remap).
with open(os.path.join(_HERE, "migrations", "0007_pipeline_stages_v2.sql")) as fh:
conn.executescript(fh.read())
conn.commit()
def stage_of(oid):
return c.execute("SELECT stage FROM opportunities WHERE id=?", (oid,)).fetchone()[0]
def archived(oid):
return c.execute("SELECT deleted_at FROM opportunities WHERE id=?", (oid,)).fetchone()[0] is not None
check(stage_of("m_out") == "engaged" and stage_of("m_meet") == "engaged",
"outreach + meeting -> engaged")
check(stage_of("m_dd") == "diligence", "due_diligence -> diligence")
check(stage_of("m_comm") == "commitment" and stage_of("m_fund") == "commitment",
"committed + funded -> commitment")
check(stage_of("m_lead") == "lead", "lead unchanged")
check(archived("m_lost"), "lost opp archived (deleted_at set)")
check(not archived("m_lead") and not archived("m_comm"),
"non-lost opps NOT archived by the migration")
# Cleanup so these rows don't perturb the derivation seed below.
c.execute("DELETE FROM opportunities WHERE id IN ('m_out','m_meet','m_dd','m_comm','m_fund','m_lost','m_lead')")
conn.commit()
def _investor(conn, srid, total, contact_id=None, comm_days_ago=None):
"""Seed a grid investor (+ optional linked contact & dated communication)."""
iid = "i_" + srid
conn.execute("INSERT INTO fundraising_investors (id, investor_name, source_row_id, total_invested) "
"VALUES (?,?,?,?)", (iid, srid, srid, total))
if contact_id:
conn.execute("INSERT INTO contacts (id, first_name, last_name) VALUES (?,?,?)",
(contact_id, srid, "Person"))
conn.execute("INSERT INTO fundraising_contacts (id, investor_id, full_name, contact_id) "
"VALUES (?,?,?,?)", ("fc_" + srid, iid, srid + " Person", contact_id))
if comm_days_ago is not None:
conn.execute("INSERT INTO communications (id, contact_id, communication_date, created_by, subject) "
"VALUES (?,?,?,?,?)",
("cm_" + srid, contact_id, _days_ago(comm_days_ago), "u1", "note"))
def test_derivations(conn):
print("\n[existing_investor + staleness derivations]")
# Existing flag: only total_invested > 0.
_investor(conn, "rowExist", 5_000_000, contact_id="c_exist", comm_days_ago=100)
_investor(conn, "rowProspect", 0)
# Staleness ramp + boundaries (>=30 aging, >=60 stale; inclusive).
_investor(conn, "rowStale", 0, contact_id="c_stale", comm_days_ago=70)
_investor(conn, "rowAging", 0, contact_id="c_aging", comm_days_ago=45)
_investor(conn, "rowFresh", 0, contact_id="c_fresh", comm_days_ago=5)
_investor(conn, "rowNoAct", 0)
_investor(conn, "rowB60", 0, contact_id="c_b60", comm_days_ago=60) # boundary -> stale
_investor(conn, "rowB59", 0, contact_id="c_b59", comm_days_ago=59) # -> aging
_investor(conn, "rowB30", 0, contact_id="c_b30", comm_days_ago=30) # boundary -> aging
_investor(conn, "rowB29", 0, contact_id="c_b29", comm_days_ago=29) # -> fresh
conn.commit()
existing = server.existing_investor_by_source_row(conn)
check(existing == {"rowExist"}, f"existing_investor = total_invested>0 only (got {sorted(existing)})")
st = server.staleness_by_source_row(conn)
level = lambda srid: st.get(srid, (None, "MISSING"))[1]
check(level("rowStale") == "stale", f"70d -> stale (got {level('rowStale')})")
check(level("rowAging") == "aging", f"45d -> aging (got {level('rowAging')})")
check(level("rowFresh") == "", f"5d -> fresh/'' (got {level('rowFresh')!r})")
check(level("rowNoAct") == "", f"no activity -> '' (got {level('rowNoAct')!r})")
check(level("rowExist") == "stale", "existing + stale coexist (orthogonal axes)")
check(level("rowB60") == "stale" and level("rowB59") == "aging",
f"60d boundary inclusive -> stale; 59d -> aging (got {level('rowB60')}, {level('rowB59')})")
check(level("rowB30") == "aging" and level("rowB29") == "",
f"30d boundary inclusive -> aging; 29d -> '' (got {level('rowB30')}, {level('rowB29')!r})")
# last_activity_at is carried through alongside the level for the recency display.
check(st.get("rowStale", (None, ""))[0] is not None, "stale row carries a last_activity_at value")
check(st.get("rowNoAct", ("X", ""))[0] is None, "no-activity row has last_activity_at None")
def main():
server.init_db()
conn = server.get_db()
conn.execute("INSERT INTO users (id,username,email,password_hash,full_name,role,is_active) "
"VALUES ('u1','grant','grant@ten31.example','x','Grant','admin',1)")
conn.execute("INSERT INTO contacts (id, first_name, last_name) VALUES ('c0','Seed','Contact')")
conn.commit()
test_migration_remap(conn)
test_derivations(conn)
conn.close()
print("\n" + ("ALL PASS (pipeline stages v2)" if not FAILS else f"{len(FAILS)} FAILED"))
sys.exit(1 if FAILS else 0)
if __name__ == "__main__":
main()
+4 -3
View File
@@ -77,9 +77,10 @@ def seed():
# opportunities on cLive (also tied to orgA so they appear in the org detail too)
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id) VALUES ('opLive','Live Opp','cLive','orgA','u1')")
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id,deleted_at) VALUES ('opDead','Dead Opp','cLive','orgA','u1',?)", (DEL,))
# funded opportunities on orgA — one live, one soft-deleted (for the org-list total_funded aggregate)
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id,stage,commitment_amount) VALUES ('opFundLive','Funded Live','cLive','orgA','u1','funded',1000000)")
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id,stage,commitment_amount,deleted_at) VALUES ('opFundDead','Funded Dead','cLive','orgA','u1','funded',500000,?)", (DEL,))
# committed-stage opportunities on orgA — one live, one soft-deleted (for the org-list
# total_funded aggregate, which now sums stage='commitment' after the 4-stage migration)
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id,stage,commitment_amount) VALUES ('opFundLive','Funded Live','cLive','orgA','u1','commitment',1000000)")
c.execute("INSERT INTO opportunities (id,name,contact_id,organization_id,owner_id,stage,commitment_amount,deleted_at) VALUES ('opFundDead','Funded Dead','cLive','orgA','u1','commitment',500000,?)", (DEL,))
# communications on cLive
c.execute("INSERT INTO communications (id,contact_id,communication_date,created_by,subject) VALUES ('cmLive','cLive','2026-05-01','u1','Live note')")
c.execute("INSERT INTO communications (id,contact_id,communication_date,created_by,subject,deleted_at) VALUES ('cmDead','cLive','2026-05-02','u1','Dead note',?)", (DEL,))