From 7007bf82044eb674893e503dba332e1b1fcbc05c Mon Sep 17 00:00:00 2001 From: Grant Date: Fri, 8 May 2026 17:26:10 -0500 Subject: [PATCH] =?UTF-8?q?Recurring=20subs=20Phase=202=20=E2=80=94=20rene?= =?UTF-8?q?wal=20worker=20(committed,=20not=20published)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the renewal lifecycle from RECURRING_SUBSCRIPTIONS_DESIGN.md phase 2. Operators don't see this yet (no admin UI); the worker only acts on subscriptions that exist in the schema, and creating subscription rows still requires direct DB insert. Phase 4 (admin UI) wires the buyer-facing surface that creates them. src/subscriptions.rs (new module, ~450 LOC): - find_due_renewals: subs with status active|past_due whose next_renewal_at has passed and consecutive_failures < cap - find_lapsing_subscriptions: past_due subs whose (next_renewal_at + grace_period_days) is in the past - mark_lapsed / mark_active_after_settle / mark_renewal_failed: state-transition helpers - create_subscription: atomic create-sub + first-cycle invoice (called by purchase flow when policy.is_recurring; not yet wired — that's a separate phase) - on_invoice_settled: helper for webhook handler to flip a sub from past_due back to active and dispatch subscription.renewed - find_subscription_for_invoice: lookup helper - tick: 60s sweep, picks up to 25 due renewals + lapse sweep - spawn: long-lived background task, mirrors webhooks::spawn_delivery_worker Renewal flow per due sub: 1. Convert listed_value to sats via rates::convert_to_sats (identity for SAT subs; live rate fetcher for USD/EUR — per MULTI_CURRENCY_DESIGN.md "USD-stable / re-quote each cycle" decision). 2. Get the active payment provider, call create_invoice with the same trait surface used by one-shot purchases. Works against BTCPay or Zaprite or any future provider. 3. Persist the local invoice row carrying the rate audit (listed_currency / listed_value / exchange_rate_centibps / exchange_rate_source). For SAT subs, rate fields are NULL (identity conversion isn't worth recording). 4. Insert subscription_invoices linking the invoice to the sub with monotonic cycle_number. 5. Update sub: status → past_due, next_renewal_at → end of new cycle, last_renewal_attempt_at → now. 6. Dispatch subscription.renewal_pending webhook to the operator. On settle (webhook handler): if the invoice is linked via subscription_invoices, flip sub → active, reset consecutive_failures to 0, dispatch subscription.renewed. Failure path: increment consecutive_failures, push next_renewal_at out by exponential backoff (5min → 30min → 2h → 6h → 12h, capped at 5 failures ≈ 24h of retries before the worker stops trying). Operator can see stuck subs via the upcoming admin UI; for now they show up in the audit log via webhook deliveries. Lapse path: separate sweep finds past_due subs whose (next_renewal_at + policy.grace_period_days) is past now, flips to lapsed, dispatches subscription.lapsed. Wired into: - src/lib.rs: pub mod subscriptions - src/main.rs: subscriptions::spawn(state.clone()) alongside reconcile + webhooks + analytics - src/api/webhook.rs: settle path now calls subscriptions::on_invoice_settled before license issuance — ordering matters because first-cycle subs create both a sub row AND a license; we want the sub state correct on the way to the license-issuance branch Test: 7 integration tests in tests/subscriptions.rs. Drives the worker against a MockProvider with fail-on-demand semantics: - renewal_worker_creates_invoice_for_sat_priced_due_sub: SAT sub charges listed_value sats verbatim, no rate audit, sub goes active → past_due, subscription_invoices gets a new cycle row - renewal_worker_requotes_rate_for_fiat_priced_sub: $25 USD at pinned $50k/BTC = exactly 50,000 sats; rate audit pinned on invoice; centibps encoded correctly - renewal_worker_backs_off_on_failure: failed create_invoice → consecutive_failures = 1, no invoice created, sub → past_due - renewal_worker_stops_retrying_at_max_failures: pre-set failures = MAX, tick is a no-op for that sub - lapse_sweep_flips_past_due_after_grace: 15-day-old past_due with grace=7 → lapsed - settle_webhook_flips_sub_back_to_active: tick creates renewal, simulate settle, on_invoice_settled flips sub back to active - tick_is_no_op_when_nothing_due: empty fixture, tick is safe Test count: 49 (was 42; +7). NOT bumping version. The recurring-subs feature isn't operator- visible until phases 4+5 (admin UI for creating recurring policies + buy page rendering for "$25/month"). Schema is in, worker runs, but nothing creates subs yet — so this commit ships dormant. --- licensing-service/src/api/webhook.rs | 15 + licensing-service/src/lib.rs | 1 + licensing-service/src/main.rs | 8 +- licensing-service/src/subscriptions.rs | 589 +++++++++++++++++++++++ licensing-service/tests/subscriptions.rs | 366 ++++++++++++++ 5 files changed, 978 insertions(+), 1 deletion(-) create mode 100644 licensing-service/src/subscriptions.rs create mode 100644 licensing-service/tests/subscriptions.rs diff --git a/licensing-service/src/api/webhook.rs b/licensing-service/src/api/webhook.rs index f8dc232..87a7ae8 100644 --- a/licensing-service/src/api/webhook.rs +++ b/licensing-service/src/api/webhook.rs @@ -124,6 +124,21 @@ pub async fn handle( return Ok(StatusCode::OK); }; + // If this settled invoice is associated with a subscription + // (renewal cycle), flip the sub back to `active` and fire + // `subscription.renewed`. Idempotent — re-running on a sub + // already in `active` state is a no-op UPDATE. Runs BEFORE + // the license-issuance branch so the sub state is correct + // even on first-cycle subs (where the license is also being + // issued for the first time). + if let Err(e) = crate::subscriptions::on_invoice_settled(&state, &invoice).await { + tracing::warn!( + invoice_id = %invoice.id, + error = %e, + "subscriptions::on_invoice_settled failed; non-fatal, license issuance proceeds" + ); + } + // Idempotency: if a license already exists for this invoice, do nothing. if repo::get_license_by_invoice(&state.db, &invoice.id) .await? diff --git a/licensing-service/src/lib.rs b/licensing-service/src/lib.rs index c966797..249e425 100644 --- a/licensing-service/src/lib.rs +++ b/licensing-service/src/lib.rs @@ -20,6 +20,7 @@ pub mod payment; pub mod rate_limit; pub mod rates; pub mod reconcile; +pub mod subscriptions; pub mod tipping; pub mod webhooks; diff --git a/licensing-service/src/main.rs b/licensing-service/src/main.rs index 1393a4e..c874b92 100644 --- a/licensing-service/src/main.rs +++ b/licensing-service/src/main.rs @@ -7,7 +7,8 @@ use anyhow::Context; use keysat::{ - analytics, api, btcpay, config, crypto, db, license_self, payment, reconcile, webhooks, + analytics, api, btcpay, config, crypto, db, license_self, payment, reconcile, subscriptions, + webhooks, }; use std::sync::Arc; use tower_http::trace::TraceLayer; @@ -131,6 +132,11 @@ async fn main() -> anyhow::Result<()> { // and short-circuits if disabled (default), so spawning is safe // unconditionally. analytics::spawn(state.clone()); + // Recurring subscriptions renewal worker. Picks up subs whose + // next_renewal_at has passed, creates fresh invoices via the + // active provider, transitions state. No-op if no recurring + // subscriptions exist; safe to spawn unconditionally. + subscriptions::spawn(state.clone()); // Hourly session reaper — drops sessions whose expires_at < now. { diff --git a/licensing-service/src/subscriptions.rs b/licensing-service/src/subscriptions.rs new file mode 100644 index 0000000..2342e3b --- /dev/null +++ b/licensing-service/src/subscriptions.rs @@ -0,0 +1,589 @@ +//! Recurring subscriptions — renewal worker + state-transition +//! helpers. +//! +//! Companion to the schema in migration 0011 and the design at +//! `RECURRING_SUBSCRIPTIONS_DESIGN.md`. This module owns: +//! +//! 1. Background worker that scans for due renewals every 60s, +//! creates fresh invoices via the active payment provider, +//! and transitions the subscription's state machine. +//! 2. Repo helpers for the renewal lifecycle (find_due, mark_*, +//! etc.) — kept here rather than in `db::repo` because they're +//! conceptually subscription-specific and easier to reason +//! about co-located with the worker that uses them. +//! 3. Helpers the webhook handler calls on settle to flip a +//! sub from `past_due` back to `active`. +//! +//! State machine recap (full diagram in the design doc): +//! +//! ┌─────────┐ cycle ends ┌──────────┐ +//! │ active │ ────────────▶ │ past_due │ +//! └─────────┘ └──────────┘ +//! ▲ pay (settle webhook) │ grace expires +//! └─────────────────────────┘ +//! │ +//! ▼ +//! ┌────────┐ +//! │ lapsed │ +//! └────────┘ +//! +//! Cancellation can flip from `active` or `past_due` → `cancelled` +//! at any point (admin or buyer-initiated). Cancelled subs stop +//! the worker from picking them up, but the license stays valid +//! through the end of the current cycle. +//! +//! Auto-charge via saved payment profiles (Zaprite's +//! `paymentProfileId` flow) is NOT in this version. The first +//! renewal-worker iteration creates fresh invoices that the buyer +//! pays manually. v0.2.0:5+ adds the auto-charge path so cycles +//! after the first don't require buyer interaction. + +use crate::api::AppState; +use crate::db::repo; +use crate::error::AppError; +use crate::models::Invoice; +use crate::payment::CreateInvoiceParams; +use anyhow::{anyhow, Context, Result}; +use chrono::{Duration as ChronoDuration, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use sqlx::{Row, SqlitePool}; +use std::time::Duration as StdDuration; +use uuid::Uuid; + +/// How often the worker wakes up to scan for due renewals. +const TICK_INTERVAL: StdDuration = StdDuration::from_secs(60); + +/// Hard cap on how many subscriptions one tick will process. Keeps +/// the worker bounded under load — a backlog of 1000 due renewals +/// drains in ~40 minutes rather than monopolizing a tick. +const MAX_PER_TICK: i64 = 25; + +/// Cap on consecutive failures before the worker stops retrying +/// and waits for operator intervention. With the backoff schedule +/// below, 5 failures spans roughly 24 hours. +const MAX_CONSECUTIVE_FAILURES: i64 = 5; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Subscription { + pub id: String, + pub license_id: String, + pub policy_id: String, + pub product_id: String, + pub period_days: i64, + pub listed_currency: String, + pub listed_value: i64, + pub status: String, + pub started_at: String, + pub next_renewal_at: Option, + pub cancelled_at: Option, + pub consecutive_failures: i64, +} + +fn row_to_subscription(row: sqlx::sqlite::SqliteRow) -> Subscription { + Subscription { + id: row.get("id"), + license_id: row.get("license_id"), + policy_id: row.get("policy_id"), + product_id: row.get("product_id"), + period_days: row.get("period_days"), + listed_currency: row.get("listed_currency"), + listed_value: row.get("listed_value"), + status: row.get("status"), + started_at: row.get("started_at"), + next_renewal_at: row.get("next_renewal_at"), + cancelled_at: row.get("cancelled_at"), + consecutive_failures: row.get("consecutive_failures"), + } +} + +const SUB_COLS: &str = "id, license_id, policy_id, product_id, period_days, \ + listed_currency, listed_value, status, started_at, next_renewal_at, \ + cancelled_at, consecutive_failures"; + +/// Subs that are due for the worker to act on right now: status +/// is `active` or `past_due`, `next_renewal_at` is in the past, +/// and we haven't given up yet. +pub async fn find_due_renewals( + pool: &SqlitePool, + limit: i64, +) -> Result> { + let now = Utc::now().to_rfc3339(); + let rows = sqlx::query(&format!( + "SELECT {SUB_COLS} FROM subscriptions \ + WHERE status IN ('active', 'past_due') \ + AND next_renewal_at IS NOT NULL \ + AND next_renewal_at <= ? \ + AND consecutive_failures < ? \ + ORDER BY next_renewal_at ASC \ + LIMIT ?" + )) + .bind(&now) + .bind(MAX_CONSECUTIVE_FAILURES) + .bind(limit) + .fetch_all(pool) + .await + .context("find_due_renewals query")?; + Ok(rows.into_iter().map(row_to_subscription).collect()) +} + +/// Subs in `past_due` whose grace period has elapsed. Worker flips +/// these to `lapsed` in a separate sweep (license validation will +/// then start rejecting). +pub async fn find_lapsing_subscriptions( + pool: &SqlitePool, + limit: i64, +) -> Result> { + // We need to JOIN to policies to read grace_period_days. Done + // inline via a sub-query on the policy's grace value computed + // against the sub's next_renewal_at. + let now = Utc::now().to_rfc3339(); + let rows = sqlx::query(&format!( + "SELECT s.id AS id, s.license_id, s.policy_id, s.product_id, s.period_days, \ + s.listed_currency, s.listed_value, s.status, s.started_at, \ + s.next_renewal_at, s.cancelled_at, s.consecutive_failures \ + FROM subscriptions s \ + JOIN policies p ON p.id = s.policy_id \ + WHERE s.status = 'past_due' \ + AND s.next_renewal_at IS NOT NULL \ + AND datetime(s.next_renewal_at, '+' || p.grace_period_days || ' days') < ? \ + LIMIT ?" + )) + .bind(&now) + .bind(limit) + .fetch_all(pool) + .await + .context("find_lapsing_subscriptions query")?; + Ok(rows.into_iter().map(row_to_subscription).collect()) +} + +/// Mark a sub as `lapsed`. Called from the worker's lapse sweep. +pub async fn mark_lapsed(pool: &SqlitePool, sub_id: &str) -> Result<()> { + let now = Utc::now().to_rfc3339(); + sqlx::query( + "UPDATE subscriptions SET status = 'lapsed', updated_at = ? WHERE id = ?", + ) + .bind(&now) + .bind(sub_id) + .execute(pool) + .await + .context("mark_lapsed")?; + Ok(()) +} + +/// Mark a sub back to `active` after a successful settle webhook. +/// Resets the failure counter so future renewals get the full +/// retry budget. Called from `api::webhook::handle` when a +/// settled invoice is also linked via `subscription_invoices`. +pub async fn mark_active_after_settle( + pool: &SqlitePool, + sub_id: &str, +) -> Result<()> { + let now = Utc::now().to_rfc3339(); + sqlx::query( + "UPDATE subscriptions \ + SET status = 'active', consecutive_failures = 0, \ + last_renewal_attempt_at = ?, updated_at = ? \ + WHERE id = ?", + ) + .bind(&now) + .bind(&now) + .bind(sub_id) + .execute(pool) + .await + .context("mark_active_after_settle")?; + Ok(()) +} + +/// Look up the subscription a given invoice belongs to (via +/// `subscription_invoices`). Returns `None` if the invoice is a +/// one-shot purchase (most invoices). Used by the webhook handler +/// to decide whether to flip a sub state on settle. +pub async fn find_subscription_for_invoice( + pool: &SqlitePool, + invoice_id: &str, +) -> Result> { + let row = sqlx::query( + "SELECT subscription_id FROM subscription_invoices WHERE invoice_id = ?", + ) + .bind(invoice_id) + .fetch_optional(pool) + .await + .context("find_subscription_for_invoice")?; + Ok(row.map(|r| r.get::("subscription_id"))) +} + +/// Atomic creation of a subscription + the first cycle's invoice. +/// Used at purchase time when an operator's policy has +/// `is_recurring = 1`. Not invoked by the worker (the worker +/// renews EXISTING subs); kept here for symmetry. +#[allow(clippy::too_many_arguments)] +pub async fn create_subscription( + pool: &SqlitePool, + license_id: &str, + policy_id: &str, + product_id: &str, + period_days: i64, + listed_currency: &str, + listed_value: i64, + first_cycle_invoice_id: &str, +) -> Result { + let id = Uuid::new_v4().to_string(); + let now = Utc::now(); + let started_at = now.to_rfc3339(); + let next_renewal_at = (now + ChronoDuration::days(period_days)).to_rfc3339(); + + sqlx::query( + "INSERT INTO subscriptions(id, license_id, policy_id, product_id, period_days, \ + listed_currency, listed_value, status, started_at, next_renewal_at, \ + consecutive_failures, created_at, updated_at) \ + VALUES(?, ?, ?, ?, ?, ?, ?, 'active', ?, ?, 0, ?, ?)", + ) + .bind(&id) + .bind(license_id) + .bind(policy_id) + .bind(product_id) + .bind(period_days) + .bind(listed_currency) + .bind(listed_value) + .bind(&started_at) + .bind(&next_renewal_at) + .bind(&started_at) + .bind(&started_at) + .execute(pool) + .await + .context("INSERT subscriptions")?; + + sqlx::query( + "INSERT INTO subscription_invoices(id, subscription_id, invoice_id, \ + cycle_number, cycle_start_at, cycle_end_at, created_at) \ + VALUES(?, ?, ?, 1, ?, ?, ?)", + ) + .bind(Uuid::new_v4().to_string()) + .bind(&id) + .bind(first_cycle_invoice_id) + .bind(&started_at) + .bind(&next_renewal_at) + .bind(&started_at) + .execute(pool) + .await + .context("INSERT subscription_invoices")?; + + Ok(Subscription { + id, + license_id: license_id.to_string(), + policy_id: policy_id.to_string(), + product_id: product_id.to_string(), + period_days, + listed_currency: listed_currency.to_string(), + listed_value, + status: "active".to_string(), + started_at: started_at.clone(), + next_renewal_at: Some(next_renewal_at), + cancelled_at: None, + consecutive_failures: 0, + }) +} + +/// Per-attempt backoff schedule for renewal failures. Index = the +/// upcoming consecutive-failures count (after this failure, what +/// will the new value be). MAX_CONSECUTIVE_FAILURES (5) is the cap +/// at which the worker stops retrying entirely. +fn renewal_backoff(attempts_after: i64) -> ChronoDuration { + match attempts_after { + 1 => ChronoDuration::minutes(5), + 2 => ChronoDuration::minutes(30), + 3 => ChronoDuration::hours(2), + 4 => ChronoDuration::hours(6), + _ => ChronoDuration::hours(12), + } +} + +/// One sweep of the renewal worker. Picks up to MAX_PER_TICK due +/// subs, attempts a renewal for each, and runs a lapse sweep at +/// the end. Returns Ok(()) even if individual renewals failed — +/// failure handling is per-sub via consecutive_failures + backoff. +/// Pub so integration tests can drive it synchronously without +/// waiting on the spawned background task. +pub async fn tick(state: &AppState) -> Result<()> { + // Phase 1: due renewals. + let due = find_due_renewals(&state.db, MAX_PER_TICK) + .await + .context("find due renewals")?; + for sub in due { + if let Err(e) = renew_one(state, &sub).await { + tracing::warn!( + sub_id = %sub.id, + error = %e, + "renewal failed; backing off" + ); + mark_renewal_failed(&state.db, &sub).await.ok(); + } + } + + // Phase 2: lapse sweep. Independent of phase 1; even if no + // renewals fired this tick, an old past_due sub might have + // crossed its grace boundary. + let lapsing = find_lapsing_subscriptions(&state.db, MAX_PER_TICK) + .await + .context("find lapsing subs")?; + for sub in lapsing { + if let Err(e) = mark_lapsed(&state.db, &sub.id).await { + tracing::warn!(sub_id = %sub.id, error = %e, "mark_lapsed failed"); + continue; + } + crate::webhooks::dispatch( + state, + "subscription.lapsed", + &json!({ + "subscription_id": sub.id, + "license_id": sub.license_id, + "product_id": sub.product_id, + "policy_id": sub.policy_id, + }), + ) + .await; + } + + Ok(()) +} + +/// Attempt a single subscription renewal: convert listed amount +/// to sats, call the active payment provider's create_invoice, +/// insert the invoice + subscription_invoices rows, advance +/// next_renewal_at to the start of the next cycle, mark sub as +/// past_due (returns to active when settle webhook fires). +async fn renew_one(state: &AppState, sub: &Subscription) -> Result<()> { + // 1. Convert listed price to sats. SAT-currency subs are an + // identity (no rate fetcher hit); fiat subs re-quote each + // cycle (per MULTI_CURRENCY_DESIGN.md decision). + let conversion = + crate::rates::convert_to_sats(state, &sub.listed_currency, sub.listed_value) + .await + .context("rate conversion")?; + let amount_sats = conversion.sats.max(1); + + // 2. Get the active provider. If no provider is configured + // we can't bill — surfaces as a renewal failure that + // backs off (operator probably mid-Disconnect). + let provider = state.payment_provider().await.map_err(|e| { + anyhow!("payment provider unavailable for renewal: {e:#}") + })?; + + // 3. Compute the next cycle window. + let now = Utc::now(); + let cycle_start = now; + let cycle_end = cycle_start + ChronoDuration::days(sub.period_days); + + // 4. Fresh internal invoice id. Becomes externalUniqId on + // Zaprite + the local invoice row id on our side. + let internal_invoice_id = Uuid::new_v4().to_string(); + let redirect_url = format!( + "{}/thank-you?invoice_id={}", + state.config.public_base_url, internal_invoice_id + ); + let metadata = json!({ + "productId": sub.product_id, + "subscriptionId": sub.id, + "cycleStartAt": cycle_start.to_rfc3339(), + }); + + // 5. Create the provider-side order/invoice. + let handle = provider + .create_invoice(CreateInvoiceParams { + amount: crate::payment::Money { + currency: if sub.listed_currency == "SAT" { + "SAT".to_string() + } else { + sub.listed_currency.clone() + }, + amount: if sub.listed_currency == "SAT" { + amount_sats + } else { + sub.listed_value + }, + }, + redirect_url: &redirect_url, + metadata, + external_order_id: &internal_invoice_id, + buyer_email: None, // renewal email comes from the license, not solicited fresh + }) + .await + .context("provider.create_invoice for renewal")?; + + // 6. Persist the local invoice row carrying the rate audit. + repo::create_invoice_with_currency( + &state.db, + &internal_invoice_id, + &handle.provider_invoice_id, + &sub.product_id, + amount_sats, + &handle.checkout_url, + None, + Some(&format!("Renewal cycle for subscription {}", sub.id)), + Some(&sub.policy_id), + if sub.listed_currency == "SAT" { + None + } else { + Some(sub.listed_currency.as_str()) + }, + if sub.listed_currency == "SAT" { + None + } else { + Some(sub.listed_value) + }, + // Rate metadata only meaningful for fiat-priced subs. + // SAT-priced subs have an identity conversion that's not + // worth recording. + if sub.listed_currency == "SAT" { + None + } else { + conversion.rate_centibps + }, + if sub.listed_currency == "SAT" { + None + } else { + Some(conversion.source.as_str()) + }, + ) + .await + .map_err(|e: AppError| anyhow!("repo create_invoice: {e:?}"))?; + + // 7. Link to the subscription. Cycle number = max(existing) + 1. + let next_cycle_num: i64 = sqlx::query_scalar( + "SELECT COALESCE(MAX(cycle_number), 0) + 1 \ + FROM subscription_invoices WHERE subscription_id = ?", + ) + .bind(&sub.id) + .fetch_one(&state.db) + .await + .context("compute next cycle_number")?; + sqlx::query( + "INSERT INTO subscription_invoices(id, subscription_id, invoice_id, \ + cycle_number, cycle_start_at, cycle_end_at, created_at) \ + VALUES(?, ?, ?, ?, ?, ?, ?)", + ) + .bind(Uuid::new_v4().to_string()) + .bind(&sub.id) + .bind(&internal_invoice_id) + .bind(next_cycle_num) + .bind(cycle_start.to_rfc3339()) + .bind(cycle_end.to_rfc3339()) + .bind(cycle_start.to_rfc3339()) + .execute(&state.db) + .await + .context("INSERT subscription_invoices for renewal")?; + + // 8. Advance the sub: status = past_due, next_renewal_at = + // end of THIS new cycle, last_renewal_attempt_at = now, + // consecutive_failures unchanged (will be reset on settle). + let now_str = now.to_rfc3339(); + let next_renewal = cycle_end.to_rfc3339(); + sqlx::query( + "UPDATE subscriptions \ + SET status = 'past_due', next_renewal_at = ?, \ + last_renewal_attempt_at = ?, updated_at = ? \ + WHERE id = ?", + ) + .bind(&next_renewal) + .bind(&now_str) + .bind(&now_str) + .bind(&sub.id) + .execute(&state.db) + .await + .context("UPDATE subscriptions on renewal create")?; + + // 9. Webhook event: operator's app gets notified that a + // renewal invoice exists and the buyer needs to pay. + crate::webhooks::dispatch( + state, + "subscription.renewal_pending", + &json!({ + "subscription_id": sub.id, + "license_id": sub.license_id, + "invoice_id": internal_invoice_id, + "checkout_url": handle.checkout_url, + "amount_sats": amount_sats, + "listed_currency": sub.listed_currency, + "listed_value": sub.listed_value, + "cycle_number": next_cycle_num, + }), + ) + .await; + + Ok(()) +} + +/// On renewal failure: bump consecutive_failures, push +/// next_renewal_at out by the backoff schedule, leave status as +/// past_due (or transition active → past_due if this was the +/// first attempt that failed). +async fn mark_renewal_failed( + pool: &SqlitePool, + sub: &Subscription, +) -> Result<()> { + let now = Utc::now(); + let new_failures = sub.consecutive_failures + 1; + let backoff = renewal_backoff(new_failures); + let new_next_renewal = (now + backoff).to_rfc3339(); + let now_str = now.to_rfc3339(); + + sqlx::query( + "UPDATE subscriptions \ + SET status = 'past_due', \ + consecutive_failures = ?, \ + next_renewal_at = ?, \ + last_renewal_attempt_at = ?, \ + updated_at = ? \ + WHERE id = ?", + ) + .bind(new_failures) + .bind(&new_next_renewal) + .bind(&now_str) + .bind(&now_str) + .bind(&sub.id) + .execute(pool) + .await + .context("UPDATE subscriptions on renewal failure")?; + Ok(()) +} + +/// Spawn the renewal worker as a long-lived background task. +/// Mirrors `webhooks::spawn_delivery_worker` — single owner, +/// process-wide, panics are logged + the loop continues. +pub fn spawn(state: AppState) { + tokio::spawn(async move { + // Stagger startup so we don't race other boot-time tasks. + tokio::time::sleep(StdDuration::from_secs(30)).await; + loop { + if let Err(e) = tick(&state).await { + tracing::warn!(error = %e, "subscription renewal tick failed"); + } + tokio::time::sleep(TICK_INTERVAL).await; + } + }); +} + +/// Helper for `api::webhook::handle` — when a settle webhook +/// fires for an invoice that's part of a subscription, flip the +/// sub back to `active` and dispatch the `subscription.renewed` +/// event. Returns Ok(()) whether or not the invoice was a +/// subscription invoice; only acts when there's a match. +pub async fn on_invoice_settled(state: &AppState, invoice: &Invoice) -> Result<()> { + let sub_id = match find_subscription_for_invoice(&state.db, &invoice.id).await? { + Some(id) => id, + None => return Ok(()), // not a subscription invoice + }; + mark_active_after_settle(&state.db, &sub_id).await?; + crate::webhooks::dispatch( + state, + "subscription.renewed", + &json!({ + "subscription_id": sub_id, + "invoice_id": invoice.id, + "amount_sats": invoice.amount_sats, + }), + ) + .await; + Ok(()) +} diff --git a/licensing-service/tests/subscriptions.rs b/licensing-service/tests/subscriptions.rs new file mode 100644 index 0000000..26bdbc7 --- /dev/null +++ b/licensing-service/tests/subscriptions.rs @@ -0,0 +1,366 @@ +//! Integration tests for the recurring-subscriptions renewal worker. +//! +//! Drives `subscriptions::tick` directly against a mocked payment +//! provider that returns deterministic order ids/checkout urls, +//! plus a manual rate pin in the settings table for fiat-priced +//! subs. No network. Verifies: +//! - SAT-priced subs renew with correct sat amounts +//! - USD-priced subs re-quote each cycle via the rate fetcher +//! - status transitions (active → past_due on renewal create) +//! - settle webhook → past_due → active +//! - lapse sweep flips past_due → lapsed once grace expires +//! - failure path increments consecutive_failures + backs off +//! - cap on consecutive_failures stops retries +//! - cycle_number monotonically increments per subscription + +use anyhow::Result; +use axum::http::HeaderMap; +use chrono::Utc; +use keysat::api::AppState; +use keysat::config::Config; +use keysat::license_self::Tier; +use keysat::payment::{ + CreateInvoiceParams, CreatedInvoiceHandle, PaymentProvider, ProviderInvoiceStatus, + ProviderKind, ProviderWebhookEvent, +}; +use keysat::subscriptions; +use serde_json::{json, Value}; +use sqlx::sqlite::{ + SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions, SqliteSynchronous, +}; +use std::any::Any; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tempfile::NamedTempFile; +use tokio::sync::RwLock; +use uuid::Uuid; + +const TEST_ADMIN_KEY: &str = "test_admin_api_key_with_at_least_32_chars_present"; + +/// Same fixture pattern as tests/api.rs::make_test_state, with +/// the renewal worker's MockPaymentProvider knobs added. +async fn make_state() -> (AppState, NamedTempFile, Arc) { + let tmp = NamedTempFile::new().expect("tempfile"); + let url = format!("sqlite://{}", tmp.path().display()); + let opts = SqliteConnectOptions::from_str(&url) + .expect("parse url") + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal) + .synchronous(SqliteSynchronous::Normal) + .foreign_keys(true) + .busy_timeout(Duration::from_secs(5)); + let pool = SqlitePoolOptions::new() + .max_connections(2) + .connect_with(opts) + .await + .expect("connect"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("migrations"); + let keypair = keysat::crypto::keys::load_or_generate(&pool) + .await + .expect("keypair"); + + let cfg = Config { + bind: "127.0.0.1:0".parse().unwrap(), + db_path: PathBuf::from(":memory:"), + admin_api_key: TEST_ADMIN_KEY.to_string(), + btcpay_url: "http://btcpay.test".to_string(), + btcpay_browser_url: None, + btcpay_public_url: None, + btcpay_api_key: None, + btcpay_store_id: None, + btcpay_webhook_secret: None, + public_base_url: "http://keysat.test".to_string(), + operator_name: None, + }; + let mock = Arc::new(MockProvider::new()); + let state = AppState { + db: pool, + keypair: Arc::new(keypair), + payment: Arc::new(RwLock::new(Some( + mock.clone() as Arc, + ))), + config: Arc::new(cfg), + self_tier: Arc::new(RwLock::new(Tier::Unlicensed { + reason: "test".into(), + })), + rates: keysat::rates::RateCache::new(), + }; + (state, tmp, mock) +} + +/// Mock payment provider for the renewal-worker tests. +/// Configurable to fail create_invoice on demand so we can +/// exercise the failure-and-backoff path. +struct MockProvider { + next_id: AtomicU64, + fail_next: AtomicBool, +} + +impl MockProvider { + fn new() -> Self { + Self { + next_id: AtomicU64::new(1), + fail_next: AtomicBool::new(false), + } + } + fn fail_next_call(&self) { + self.fail_next.store(true, Ordering::SeqCst); + } +} + +#[async_trait::async_trait] +impl PaymentProvider for MockProvider { + fn kind(&self) -> ProviderKind { + ProviderKind::Btcpay + } + async fn create_invoice( + &self, + _params: CreateInvoiceParams<'_>, + ) -> Result { + if self.fail_next.swap(false, Ordering::SeqCst) { + anyhow::bail!("mock-induced create_invoice failure"); + } + let n = self.next_id.fetch_add(1, Ordering::SeqCst); + Ok(CreatedInvoiceHandle { + provider_invoice_id: format!("mock-renewal-{n}"), + checkout_url: format!("http://mock.test/checkout/{n}"), + }) + } + async fn get_invoice_status(&self, _id: &str) -> Result { + Ok(ProviderInvoiceStatus::Pending) + } + fn validate_webhook(&self, _h: &HeaderMap, _b: &[u8]) -> Result { + anyhow::bail!("not exercised by renewal-worker tests") + } + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Set up a SAT-priced subscription that's already due for renewal. +/// Returns the sub id. +async fn seed_due_sat_subscription(pool: &SqlitePool) -> String { + let now = Utc::now().to_rfc3339(); + sqlx::query("INSERT INTO products(id, slug, name, price_sats, created_at, updated_at) VALUES('p1','sub','Sub Product',1000,?,?)") + .bind(&now).bind(&now).execute(pool).await.unwrap(); + sqlx::query( + "INSERT INTO policies(id, product_id, name, slug, is_recurring, renewal_period_days, \ + grace_period_days, created_at, updated_at) \ + VALUES('pol1','p1','Monthly','monthly',1,30,7,?,?)", + ) + .bind(&now).bind(&now).execute(pool).await.unwrap(); + sqlx::query( + "INSERT INTO licenses(id, product_id, status, issued_at, policy_id) \ + VALUES('lic1','p1','active',?,'pol1')", + ) + .bind(&now).execute(pool).await.unwrap(); + + let sub_id = "sub1".to_string(); + let past = (Utc::now() - chrono::Duration::days(1)).to_rfc3339(); + sqlx::query( + "INSERT INTO subscriptions(id, license_id, policy_id, product_id, period_days, \ + listed_currency, listed_value, status, started_at, next_renewal_at, \ + consecutive_failures, created_at, updated_at) \ + VALUES(?, 'lic1', 'pol1', 'p1', 30, 'SAT', 50000, 'active', ?, ?, 0, ?, ?)", + ) + .bind(&sub_id).bind(&now).bind(&past).bind(&now).bind(&now) + .execute(pool).await.unwrap(); + sub_id +} + +#[tokio::test] +async fn renewal_worker_creates_invoice_for_sat_priced_due_sub() { + let (state, _tmp, _mock) = make_state().await; + let sub_id = seed_due_sat_subscription(&state.db).await; + + subscriptions::tick(&state).await.expect("tick"); + + // Sub flipped to past_due, next_renewal_at advanced ~30 days, + // consecutive_failures still 0. + let row: (String, Option, i64) = sqlx::query_as( + "SELECT status, next_renewal_at, consecutive_failures FROM subscriptions WHERE id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(row.0, "past_due", "renewal-create flips active → past_due"); + assert!(row.1.is_some()); + assert_eq!(row.2, 0); + + // Invoice was created with sat amount = listed_value (SAT identity). + let inv: (i64, Option) = sqlx::query_as( + "SELECT i.amount_sats, i.exchange_rate_source \ + FROM invoices i \ + JOIN subscription_invoices si ON si.invoice_id = i.id \ + WHERE si.subscription_id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(inv.0, 50_000, "sat-priced sub charges listed_value sats verbatim"); + // SAT subs don't record a rate source (identity conversion). + assert!(inv.1.is_none()); + + // subscription_invoices got a row with cycle_number = 2 (the + // first cycle invoice was the original purchase, which the + // seed didn't create — so the worker's first renewal is + // cycle 1 in the seed's universe; check it's > 0). + let cycle: i64 = sqlx::query_scalar( + "SELECT cycle_number FROM subscription_invoices WHERE subscription_id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert!(cycle >= 1); +} + +#[tokio::test] +async fn renewal_worker_requotes_rate_for_fiat_priced_sub() { + let (state, _tmp, _mock) = make_state().await; + // Pin USD to $50,000/BTC so $25.00 → 50,000 sats exactly. + sqlx::query("INSERT INTO settings(key, value, updated_at) VALUES('manual_rate_pin_USD', '50000', ?)") + .bind(Utc::now().to_rfc3339()).execute(&state.db).await.unwrap(); + + let now = Utc::now().to_rfc3339(); + sqlx::query("INSERT INTO products(id, slug, name, price_sats, price_currency, price_value, created_at, updated_at) VALUES('p1','usd','USD Sub',0,'USD',2500,?,?)") + .bind(&now).bind(&now).execute(&state.db).await.unwrap(); + sqlx::query("INSERT INTO policies(id, product_id, name, slug, is_recurring, renewal_period_days, grace_period_days, created_at, updated_at) VALUES('pol1','p1','M','m',1,30,7,?,?)") + .bind(&now).bind(&now).execute(&state.db).await.unwrap(); + sqlx::query("INSERT INTO licenses(id, product_id, status, issued_at, policy_id) VALUES('lic1','p1','active',?,'pol1')") + .bind(&now).execute(&state.db).await.unwrap(); + + let past = (Utc::now() - chrono::Duration::days(1)).to_rfc3339(); + sqlx::query( + "INSERT INTO subscriptions(id, license_id, policy_id, product_id, period_days, \ + listed_currency, listed_value, status, started_at, next_renewal_at, \ + consecutive_failures, created_at, updated_at) \ + VALUES('sub1','lic1','pol1','p1',30,'USD',2500,'active',?,?,0,?,?)", + ) + .bind(&now).bind(&past).bind(&now).bind(&now).execute(&state.db).await.unwrap(); + + subscriptions::tick(&state).await.expect("tick"); + + // $25 at $50k/BTC = 0.0005 BTC = 50,000 sats. + let row: (i64, Option, Option, Option) = sqlx::query_as( + "SELECT i.amount_sats, i.listed_currency, i.listed_value, i.exchange_rate_centibps \ + FROM invoices i \ + JOIN subscription_invoices si ON si.invoice_id = i.id \ + WHERE si.subscription_id = 'sub1'", + ) + .fetch_one(&state.db).await.unwrap(); + assert_eq!(row.0, 50_000); + assert_eq!(row.1.as_deref(), Some("USD")); + assert_eq!(row.2, Some(2500)); + assert_eq!(row.3, Some(500_000_000)); +} + +#[tokio::test] +async fn renewal_worker_backs_off_on_failure() { + let (state, _tmp, mock) = make_state().await; + let sub_id = seed_due_sat_subscription(&state.db).await; + + // Force the next provider call to fail. + mock.fail_next_call(); + subscriptions::tick(&state).await.expect("tick succeeds even when individual renewal fails"); + + let row: (String, i64) = sqlx::query_as( + "SELECT status, consecutive_failures FROM subscriptions WHERE id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(row.0, "past_due"); + assert_eq!(row.1, 1, "first failure increments to 1"); + + // No invoice was created. + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM subscription_invoices WHERE subscription_id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(count, 0); +} + +#[tokio::test] +async fn renewal_worker_stops_retrying_at_max_failures() { + let (state, _tmp, _mock) = make_state().await; + let sub_id = seed_due_sat_subscription(&state.db).await; + + // Pre-set consecutive_failures = MAX so the find-due query + // skips this row. + sqlx::query("UPDATE subscriptions SET consecutive_failures = 5 WHERE id = ?") + .bind(&sub_id).execute(&state.db).await.unwrap(); + + subscriptions::tick(&state).await.expect("tick"); + + // Should not have attempted a renewal — no invoices created. + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM subscription_invoices WHERE subscription_id = ?", + ) + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(count, 0, "MAX_CONSECUTIVE_FAILURES should stop retries"); +} + +#[tokio::test] +async fn lapse_sweep_flips_past_due_after_grace() { + let (state, _tmp, _mock) = make_state().await; + let sub_id = seed_due_sat_subscription(&state.db).await; + + // Set the sub to past_due with a next_renewal_at far enough + // in the past that grace_period_days (7 from the seed policy) + // has clearly elapsed. + let way_past = (Utc::now() - chrono::Duration::days(15)).to_rfc3339(); + sqlx::query( + "UPDATE subscriptions SET status='past_due', next_renewal_at=?, \ + consecutive_failures=5 WHERE id = ?", + ) + .bind(&way_past).bind(&sub_id).execute(&state.db).await.unwrap(); + + subscriptions::tick(&state).await.expect("tick"); + + let status: String = + sqlx::query_scalar("SELECT status FROM subscriptions WHERE id = ?") + .bind(&sub_id).fetch_one(&state.db).await.unwrap(); + assert_eq!(status, "lapsed", "past_due past grace should flip to lapsed"); +} + +#[tokio::test] +async fn settle_webhook_flips_sub_back_to_active() { + let (state, _tmp, _mock) = make_state().await; + let _sub_id = seed_due_sat_subscription(&state.db).await; + + // First tick creates the renewal invoice and flips sub to past_due. + subscriptions::tick(&state).await.expect("tick"); + + // Find the just-created invoice + simulate a settle. + let invoice_id: String = sqlx::query_scalar( + "SELECT i.id FROM invoices i \ + JOIN subscription_invoices si ON si.invoice_id = i.id \ + WHERE si.subscription_id = 'sub1'", + ) + .fetch_one(&state.db).await.unwrap(); + sqlx::query("UPDATE invoices SET status = 'settled' WHERE id = ?") + .bind(&invoice_id).execute(&state.db).await.unwrap(); + + // Build the Invoice model for the helper's signature. + let invoice = keysat::db::repo::get_invoice_by_id(&state.db, &invoice_id) + .await.unwrap().unwrap(); + keysat::subscriptions::on_invoice_settled(&state, &invoice) + .await.expect("on_invoice_settled"); + + let row: (String, i64) = sqlx::query_as( + "SELECT status, consecutive_failures FROM subscriptions WHERE id = 'sub1'", + ) + .fetch_one(&state.db).await.unwrap(); + assert_eq!(row.0, "active"); + assert_eq!(row.1, 0); +} + +/// Tick is idempotent in the no-op direction: running it when no +/// subs are due doesn't crash and doesn't side-effect anything. +#[tokio::test] +async fn tick_is_no_op_when_nothing_due() { + let (state, _tmp, _mock) = make_state().await; + // No fixtures seeded. + subscriptions::tick(&state).await.expect("tick on empty"); + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM invoices") + .fetch_one(&state.db).await.unwrap(); + assert_eq!(count, 0); +}