From 5ec9a6e8c035740dade16a1570f31c3c9bca34dd Mon Sep 17 00:00:00 2001 From: Grant Date: Fri, 8 May 2026 10:40:11 -0500 Subject: [PATCH] Migrate reconcile + tipping onto PaymentProvider trait; add worker tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two compat-path holdovers migrated: - src/reconcile.rs: was state.btcpay_client().get_invoice() with manual JSON parsing of BTCPay-specific status strings ("Settled", "Complete", "Expired", "Invalid"). Now state.payment_provider() .get_invoice_status() returning the typed ProviderInvoiceStatus enum. The string normalization moves into BtcpayProvider's impl where it belongs. - src/tipping.rs: was state.btcpay_client().pay_lightning_invoice() returning raw JSON, then manual paymentHash extraction. Now provider.pay_lightning_invoice() returning a typed PaymentReceipt { payment_hash, raw }. The audit message now records the active provider's kind() rather than hardcoding "BTCPay LN node". Combined with v0.1.0:43's purchase migration, the daemon's non-test code now contains zero calls to state.btcpay_client() or .btcpay_webhook_secret(). Those compat accessors stay on AppState for v0.2 (no need to break things gratuitously) but they're dead code in the production path. Zaprite's drop-in only needs to implement the trait. Worker integration tests (tests/worker.rs): - worker_marks_failure_and_schedules_retry_on_500: spins up a tiny axum receiver that 500s, calls webhooks::tick(), verifies attempt count and next-attempt scheduling. - worker_dead_letters_after_max_attempts: seeds a row at attempt count 9, ticks once, verifies attempt_count → 10 and next_attempt_at → NULL. Confirms the row also satisfies the admin DLQ predicate (the contract :43's webhook_deliveries.rs depends on). - worker_marks_success_on_2xx: pins the happy path. webhooks::tick is now `pub` so integration tests can drive it synchronously. Test count: 26 (9 unit + 4 migration + 10 API + 3 worker). --- licensing-service/src/reconcile.rs | 32 ++-- licensing-service/src/tipping.rs | 30 ++-- licensing-service/src/webhooks.rs | 6 +- licensing-service/tests/worker.rs | 253 +++++++++++++++++++++++++++++ 4 files changed, 291 insertions(+), 30 deletions(-) create mode 100644 licensing-service/tests/worker.rs diff --git a/licensing-service/src/reconcile.rs b/licensing-service/src/reconcile.rs index f9f6450..ce8859f 100644 --- a/licensing-service/src/reconcile.rs +++ b/licensing-service/src/reconcile.rs @@ -41,8 +41,12 @@ pub fn spawn(state: AppState) { } async fn tick(state: &AppState) -> anyhow::Result<()> { - let btcpay = match state.btcpay_client().await { - Ok(c) => c, + // Provider-agnostic. Each provider's impl handles the + // provider-specific status-string normalization (BTCPay's + // "Settled"/"Complete"/"Expired"/"Invalid" → ProviderInvoiceStatus + // enum); this loop just operates on the typed result. + let provider = match state.payment_provider().await { + Ok(p) => p, Err(_) => return Ok(()), // not configured yet — skip silently }; @@ -56,21 +60,17 @@ async fn tick(state: &AppState) -> anyhow::Result<()> { tracing::debug!(count = pending.len(), "reconciling pending invoices"); for inv in pending { - match btcpay.get_invoice(&inv.btcpay_invoice_id).await { - Ok(raw) => { - let status_str = raw - .get("status") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let normalized = match status_str.as_str() { - "Settled" | "Complete" => Some("settled"), - "Expired" => Some("expired"), - "Invalid" => Some("invalid"), - // still in flight - _ => None, + match provider.get_invoice_status(&inv.btcpay_invoice_id).await { + Ok(status) => { + use crate::payment::ProviderInvoiceStatus::*; + let new_status = match status { + Settled => "settled", + Expired => "expired", + Invalid => "invalid", + // Pending stays pending; Refunded is a v0.3 surface + // that the webhook handler also short-circuits on. + Pending | Refunded => continue, }; - let Some(new_status) = normalized else { continue }; if new_status == inv.status.as_str() { continue; // no-op diff --git a/licensing-service/src/tipping.rs b/licensing-service/src/tipping.rs index ae358dc..f0370f7 100644 --- a/licensing-service/src/tipping.rs +++ b/licensing-service/src/tipping.rs @@ -199,11 +199,15 @@ async fn run_tip( } }; - // Pay it via the operator's BTCPay Lightning node. - let btcpay = match state.btcpay_client().await { - Ok(c) => c, + // Pay it via the active provider's LN node. Provider-agnostic; + // BTCPay implements `pay_lightning_invoice` today, future + // providers either implement it (Zaprite via Strike?) or fall + // through to the trait default which returns a "not supported" + // error that we record as a failed tip. + let provider = match state.payment_provider().await { + Ok(p) => p, Err(e) => { - let detail = format!("BTCPay client unavailable: {e:?}"); + let detail = format!("payment provider unavailable: {e:?}"); repo::record_tip_attempt( &state.db, license_id, @@ -222,17 +226,13 @@ async fn run_tip( } }; - match btcpay.pay_lightning_invoice(&invoice).await { - Ok(payment) => { - let payment_hash = payment - .get("paymentHash") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); + match provider.pay_lightning_invoice(&invoice).await { + Ok(receipt) => { tracing::info!( license = %license_id, recipient = %recipient, amount_sats = tip_sats, - payment_hash = ?payment_hash, + payment_hash = ?receipt.payment_hash, "tip sent" ); repo::record_tip_attempt( @@ -244,8 +244,12 @@ async fn run_tip( pct, label.as_deref(), "sent", - Some(&format!("paid via BTCPay LN node ({} sats)", tip_sats)), - payment_hash.as_deref(), + Some(&format!( + "paid via {} LN node ({} sats)", + provider.kind().as_str(), + tip_sats + )), + receipt.payment_hash.as_deref(), ) .await .ok(); diff --git a/licensing-service/src/webhooks.rs b/licensing-service/src/webhooks.rs index 78ba81c..d8ec45a 100644 --- a/licensing-service/src/webhooks.rs +++ b/licensing-service/src/webhooks.rs @@ -91,7 +91,11 @@ pub fn spawn_delivery_worker(state: AppState) { }); } -async fn tick(state: &AppState) -> anyhow::Result<()> { +/// Process up to 25 due deliveries: HMAC-sign, POST, mark each +/// success/failure. Public so integration tests can drive the worker +/// synchronously (the spawned background task in +/// `spawn_delivery_worker` simply calls this every 5s). +pub async fn tick(state: &AppState) -> anyhow::Result<()> { let due = repo::list_ready_deliveries(&state.db, 25) .await .map_err(|e| anyhow::anyhow!("{e:?}"))?; diff --git a/licensing-service/tests/worker.rs b/licensing-service/tests/worker.rs new file mode 100644 index 0000000..76fcbf8 --- /dev/null +++ b/licensing-service/tests/worker.rs @@ -0,0 +1,253 @@ +//! Integration tests for the outbound-webhook delivery worker. +//! +//! Companion to `tests/api.rs`'s DLQ test, which only exercised the +//! admin surface against an SQL fixture. This file drives the worker +//! itself (`webhooks::tick`) against a real HTTP receiver, watching +//! the retry-then-dead-letter behavior empirically rather than +//! trusting the SQL fixture. + +use axum::{http::StatusCode, routing::any, Router}; +use chrono::Utc; +use keysat::api::AppState; +use keysat::config::Config; +use keysat::license_self::Tier; +use keysat::{crypto, webhooks}; +use sqlx::sqlite::{ + SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions, SqliteSynchronous, +}; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tempfile::NamedTempFile; +use tokio::net::TcpListener; +use tokio::sync::RwLock; + +/// Minimum-viable AppState for a worker test. The worker only touches +/// `state.db` for queue queries — nothing else matters here. +async fn make_state() -> (AppState, NamedTempFile) { + let tmp = NamedTempFile::new().expect("tempfile"); + let url = format!("sqlite://{}", tmp.path().display()); + let opts = SqliteConnectOptions::from_str(&url) + .expect("parse sqlite url") + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal) + .synchronous(SqliteSynchronous::Normal) + .foreign_keys(true) + .busy_timeout(Duration::from_secs(5)); + let pool = SqlitePoolOptions::new() + .max_connections(2) + .connect_with(opts) + .await + .expect("connect sqlite"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("apply migrations"); + let keypair = crypto::keys::load_or_generate(&pool) + .await + .expect("load_or_generate keypair"); + + let cfg = Config { + bind: "127.0.0.1:0".parse().unwrap(), + db_path: PathBuf::from(":memory:"), + admin_api_key: "x".repeat(32), + btcpay_url: "http://btcpay.test".to_string(), + btcpay_browser_url: None, + btcpay_public_url: None, + btcpay_api_key: None, + btcpay_store_id: None, + btcpay_webhook_secret: None, + public_base_url: "http://keysat.test".to_string(), + operator_name: None, + }; + let state = AppState { + db: pool, + keypair: Arc::new(keypair), + payment: Arc::new(RwLock::new(None)), + config: Arc::new(cfg), + self_tier: Arc::new(RwLock::new(Tier::Unlicensed { + reason: "test".into(), + })), + }; + (state, tmp) +} + +/// Spawn a tiny axum server on a random port that returns 500 for every +/// request. Returns the URL the webhook endpoint should be configured +/// with. Server runs for the lifetime of the test process; tokio +/// reclaims it on test completion. +async fn spawn_500_receiver() -> String { + let app = Router::new().route( + "/", + any(|| async { StatusCode::INTERNAL_SERVER_ERROR }), + ); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + tokio::spawn(async move { + axum::serve(listener, app).await.ok(); + }); + format!("http://{addr}/") +} + +/// Insert a webhook endpoint + a single ready-to-deliver row. +async fn seed_endpoint_and_delivery( + pool: &SqlitePool, + url: &str, + initial_attempts: i64, +) -> String { + let now = Utc::now().to_rfc3339(); + let endpoint_id = "ep-test"; + sqlx::query( + "INSERT INTO webhook_endpoints(id, url, secret, event_types, active, \ + description, created_at, updated_at) \ + VALUES(?, ?, '0123456789abcdef0123456789abcdef', '[\"*\"]', 1, '', ?, ?)", + ) + .bind(endpoint_id) + .bind(url) + .bind(&now) + .bind(&now) + .execute(pool) + .await + .unwrap(); + + let delivery_id = "del-test"; + sqlx::query( + "INSERT INTO webhook_deliveries(id, endpoint_id, event_type, \ + payload_json, attempt_count, next_attempt_at, created_at) \ + VALUES(?, ?, 'license.issued', '{\"data\":\"x\"}', ?, ?, ?)", + ) + .bind(delivery_id) + .bind(endpoint_id) + .bind(initial_attempts) + .bind(&now) // due now + .bind(&now) + .execute(pool) + .await + .unwrap(); + + delivery_id.to_string() +} + +/// First-attempt failure: worker POSTs, receiver 500s, worker marks +/// the row as a failure, schedules a retry. Verifies attempt_count +/// went 0→1, next_attempt_at is in the future, last_status_code is +/// the 500, last_error is populated. +#[tokio::test] +async fn worker_marks_failure_and_schedules_retry_on_500() { + let (state, _tmp) = make_state().await; + let url = spawn_500_receiver().await; + let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 0).await; + + webhooks::tick(&state).await.expect("tick"); + + let row: (i64, Option, Option, Option, Option) = + sqlx::query_as( + "SELECT attempt_count, next_attempt_at, last_status_code, \ + last_error, delivered_at FROM webhook_deliveries WHERE id = ?", + ) + .bind(&delivery_id) + .fetch_one(&state.db) + .await + .unwrap(); + + assert_eq!(row.0, 1, "attempt_count should be 1 after one failed tick"); + assert!( + row.1.is_some(), + "next_attempt_at should be scheduled for retry" + ); + assert_eq!( + row.2, + Some(500), + "last_status_code should record the receiver's 500" + ); + assert!( + row.3.as_deref().unwrap_or("").contains("non-2xx"), + "last_error should describe the failure: {:?}", + row.3 + ); + assert!(row.4.is_none(), "delivered_at must remain NULL on failure"); +} + +/// Crossing the dead-letter boundary: with attempt_count already at 9, +/// one more failed tick takes it to 10, and the worker MUST NOT +/// schedule another retry — it sets next_attempt_at = NULL. This is +/// the row that the new admin DLQ surface (`?status=failed`) picks up. +#[tokio::test] +async fn worker_dead_letters_after_max_attempts() { + let (state, _tmp) = make_state().await; + let url = spawn_500_receiver().await; + let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 9).await; + + webhooks::tick(&state).await.expect("tick"); + + let row: (i64, Option, Option) = sqlx::query_as( + "SELECT attempt_count, next_attempt_at, delivered_at \ + FROM webhook_deliveries WHERE id = ?", + ) + .bind(&delivery_id) + .fetch_one(&state.db) + .await + .unwrap(); + + assert_eq!(row.0, 10, "attempt_count should reach the cap"); + assert!( + row.1.is_none(), + "next_attempt_at MUST be NULL — this is the DLQ signal: {:?}", + row.1 + ); + assert!(row.2.is_none(), "delivered_at must remain NULL"); + + // Confirm the dead-lettered row also shows up in the admin DLQ + // filter — the SQL predicate the admin endpoint uses + // (delivered_at IS NULL AND next_attempt_at IS NULL AND + // attempt_count > 0) must match this row. + let dlq_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM webhook_deliveries \ + WHERE delivered_at IS NULL AND next_attempt_at IS NULL AND attempt_count > 0", + ) + .fetch_one(&state.db) + .await + .unwrap(); + assert_eq!( + dlq_count, 1, + "the dead-lettered row must satisfy the admin DLQ predicate" + ); +} + +/// 2xx response → success. The worker stamps `delivered_at` with the +/// current time, leaves `next_attempt_at` NULL, and records the status +/// code. This is the happy path — implicitly tested already via +/// production usage but pinned here for completeness alongside the +/// failure cases above. +#[tokio::test] +async fn worker_marks_success_on_2xx() { + let (state, _tmp) = make_state().await; + + // Receiver that always returns 200. + let app = Router::new().route("/", any(|| async { StatusCode::OK })); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.ok(); + }); + let url = format!("http://{addr}/"); + + let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 0).await; + + webhooks::tick(&state).await.expect("tick"); + + let row: (i64, Option, Option, Option) = sqlx::query_as( + "SELECT attempt_count, next_attempt_at, last_status_code, delivered_at \ + FROM webhook_deliveries WHERE id = ?", + ) + .bind(&delivery_id) + .fetch_one(&state.db) + .await + .unwrap(); + + assert_eq!(row.0, 1); + assert!(row.1.is_none(), "next_attempt_at should be NULL on success"); + assert_eq!(row.2, Some(200)); + assert!(row.3.is_some(), "delivered_at should be stamped on success"); +}