Migrate reconcile + tipping onto PaymentProvider trait; add worker tests

Two compat-path holdovers migrated:

- src/reconcile.rs: was state.btcpay_client().get_invoice() with
  manual JSON parsing of BTCPay-specific status strings ("Settled",
  "Complete", "Expired", "Invalid"). Now state.payment_provider()
  .get_invoice_status() returning the typed ProviderInvoiceStatus
  enum. The string normalization moves into BtcpayProvider's impl
  where it belongs.

- src/tipping.rs: was state.btcpay_client().pay_lightning_invoice()
  returning raw JSON, then manual paymentHash extraction. Now
  provider.pay_lightning_invoice() returning a typed PaymentReceipt
  { payment_hash, raw }. The audit message now records the active
  provider's kind() rather than hardcoding "BTCPay LN node".

Combined with v0.1.0:43's purchase migration, the daemon's
non-test code now contains zero calls to state.btcpay_client() or
.btcpay_webhook_secret(). Those compat accessors stay on AppState
for v0.2 (no need to break things gratuitously) but they're dead
code in the production path. Zaprite's drop-in only needs to
implement the trait.

Worker integration tests (tests/worker.rs):

- worker_marks_failure_and_schedules_retry_on_500: spins up a tiny
  axum receiver that 500s, calls webhooks::tick(), verifies attempt
  count and next-attempt scheduling.
- worker_dead_letters_after_max_attempts: seeds a row at attempt
  count 9, ticks once, verifies attempt_count → 10 and
  next_attempt_at → NULL. Confirms the row also satisfies the admin
  DLQ predicate (the contract :43's webhook_deliveries.rs depends
  on).
- worker_marks_success_on_2xx: pins the happy path.

webhooks::tick is now `pub` so integration tests can drive it
synchronously.

Test count: 26 (9 unit + 4 migration + 10 API + 3 worker).
This commit is contained in:
Grant
2026-05-08 10:40:11 -05:00
parent 96490bf3bf
commit 5ec9a6e8c0
4 changed files with 291 additions and 30 deletions
+16 -16
View File
@@ -41,8 +41,12 @@ pub fn spawn(state: AppState) {
}
async fn tick(state: &AppState) -> anyhow::Result<()> {
let btcpay = match state.btcpay_client().await {
Ok(c) => c,
// Provider-agnostic. Each provider's impl handles the
// provider-specific status-string normalization (BTCPay's
// "Settled"/"Complete"/"Expired"/"Invalid" → ProviderInvoiceStatus
// enum); this loop just operates on the typed result.
let provider = match state.payment_provider().await {
Ok(p) => p,
Err(_) => return Ok(()), // not configured yet — skip silently
};
@@ -56,21 +60,17 @@ async fn tick(state: &AppState) -> anyhow::Result<()> {
tracing::debug!(count = pending.len(), "reconciling pending invoices");
for inv in pending {
match btcpay.get_invoice(&inv.btcpay_invoice_id).await {
Ok(raw) => {
let status_str = raw
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let normalized = match status_str.as_str() {
"Settled" | "Complete" => Some("settled"),
"Expired" => Some("expired"),
"Invalid" => Some("invalid"),
// still in flight
_ => None,
match provider.get_invoice_status(&inv.btcpay_invoice_id).await {
Ok(status) => {
use crate::payment::ProviderInvoiceStatus::*;
let new_status = match status {
Settled => "settled",
Expired => "expired",
Invalid => "invalid",
// Pending stays pending; Refunded is a v0.3 surface
// that the webhook handler also short-circuits on.
Pending | Refunded => continue,
};
let Some(new_status) = normalized else { continue };
if new_status == inv.status.as_str() {
continue; // no-op
+17 -13
View File
@@ -199,11 +199,15 @@ async fn run_tip(
}
};
// Pay it via the operator's BTCPay Lightning node.
let btcpay = match state.btcpay_client().await {
Ok(c) => c,
// Pay it via the active provider's LN node. Provider-agnostic;
// BTCPay implements `pay_lightning_invoice` today, future
// providers either implement it (Zaprite via Strike?) or fall
// through to the trait default which returns a "not supported"
// error that we record as a failed tip.
let provider = match state.payment_provider().await {
Ok(p) => p,
Err(e) => {
let detail = format!("BTCPay client unavailable: {e:?}");
let detail = format!("payment provider unavailable: {e:?}");
repo::record_tip_attempt(
&state.db,
license_id,
@@ -222,17 +226,13 @@ async fn run_tip(
}
};
match btcpay.pay_lightning_invoice(&invoice).await {
Ok(payment) => {
let payment_hash = payment
.get("paymentHash")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
match provider.pay_lightning_invoice(&invoice).await {
Ok(receipt) => {
tracing::info!(
license = %license_id,
recipient = %recipient,
amount_sats = tip_sats,
payment_hash = ?payment_hash,
payment_hash = ?receipt.payment_hash,
"tip sent"
);
repo::record_tip_attempt(
@@ -244,8 +244,12 @@ async fn run_tip(
pct,
label.as_deref(),
"sent",
Some(&format!("paid via BTCPay LN node ({} sats)", tip_sats)),
payment_hash.as_deref(),
Some(&format!(
"paid via {} LN node ({} sats)",
provider.kind().as_str(),
tip_sats
)),
receipt.payment_hash.as_deref(),
)
.await
.ok();
+5 -1
View File
@@ -91,7 +91,11 @@ pub fn spawn_delivery_worker(state: AppState) {
});
}
async fn tick(state: &AppState) -> anyhow::Result<()> {
/// Process up to 25 due deliveries: HMAC-sign, POST, mark each
/// success/failure. Public so integration tests can drive the worker
/// synchronously (the spawned background task in
/// `spawn_delivery_worker` simply calls this every 5s).
pub async fn tick(state: &AppState) -> anyhow::Result<()> {
let due = repo::list_ready_deliveries(&state.db, 25)
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
+253
View File
@@ -0,0 +1,253 @@
//! Integration tests for the outbound-webhook delivery worker.
//!
//! Companion to `tests/api.rs`'s DLQ test, which only exercised the
//! admin surface against an SQL fixture. This file drives the worker
//! itself (`webhooks::tick`) against a real HTTP receiver, watching
//! the retry-then-dead-letter behavior empirically rather than
//! trusting the SQL fixture.
use axum::{http::StatusCode, routing::any, Router};
use chrono::Utc;
use keysat::api::AppState;
use keysat::config::Config;
use keysat::license_self::Tier;
use keysat::{crypto, webhooks};
use sqlx::sqlite::{
SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions, SqliteSynchronous,
};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
/// Minimum-viable AppState for a worker test. The worker only touches
/// `state.db` for queue queries — nothing else matters here.
async fn make_state() -> (AppState, NamedTempFile) {
let tmp = NamedTempFile::new().expect("tempfile");
let url = format!("sqlite://{}", tmp.path().display());
let opts = SqliteConnectOptions::from_str(&url)
.expect("parse sqlite url")
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.foreign_keys(true)
.busy_timeout(Duration::from_secs(5));
let pool = SqlitePoolOptions::new()
.max_connections(2)
.connect_with(opts)
.await
.expect("connect sqlite");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("apply migrations");
let keypair = crypto::keys::load_or_generate(&pool)
.await
.expect("load_or_generate keypair");
let cfg = Config {
bind: "127.0.0.1:0".parse().unwrap(),
db_path: PathBuf::from(":memory:"),
admin_api_key: "x".repeat(32),
btcpay_url: "http://btcpay.test".to_string(),
btcpay_browser_url: None,
btcpay_public_url: None,
btcpay_api_key: None,
btcpay_store_id: None,
btcpay_webhook_secret: None,
public_base_url: "http://keysat.test".to_string(),
operator_name: None,
};
let state = AppState {
db: pool,
keypair: Arc::new(keypair),
payment: Arc::new(RwLock::new(None)),
config: Arc::new(cfg),
self_tier: Arc::new(RwLock::new(Tier::Unlicensed {
reason: "test".into(),
})),
};
(state, tmp)
}
/// Spawn a tiny axum server on a random port that returns 500 for every
/// request. Returns the URL the webhook endpoint should be configured
/// with. Server runs for the lifetime of the test process; tokio
/// reclaims it on test completion.
async fn spawn_500_receiver() -> String {
let app = Router::new().route(
"/",
any(|| async { StatusCode::INTERNAL_SERVER_ERROR }),
);
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
format!("http://{addr}/")
}
/// Insert a webhook endpoint + a single ready-to-deliver row.
async fn seed_endpoint_and_delivery(
pool: &SqlitePool,
url: &str,
initial_attempts: i64,
) -> String {
let now = Utc::now().to_rfc3339();
let endpoint_id = "ep-test";
sqlx::query(
"INSERT INTO webhook_endpoints(id, url, secret, event_types, active, \
description, created_at, updated_at) \
VALUES(?, ?, '0123456789abcdef0123456789abcdef', '[\"*\"]', 1, '', ?, ?)",
)
.bind(endpoint_id)
.bind(url)
.bind(&now)
.bind(&now)
.execute(pool)
.await
.unwrap();
let delivery_id = "del-test";
sqlx::query(
"INSERT INTO webhook_deliveries(id, endpoint_id, event_type, \
payload_json, attempt_count, next_attempt_at, created_at) \
VALUES(?, ?, 'license.issued', '{\"data\":\"x\"}', ?, ?, ?)",
)
.bind(delivery_id)
.bind(endpoint_id)
.bind(initial_attempts)
.bind(&now) // due now
.bind(&now)
.execute(pool)
.await
.unwrap();
delivery_id.to_string()
}
/// First-attempt failure: worker POSTs, receiver 500s, worker marks
/// the row as a failure, schedules a retry. Verifies attempt_count
/// went 0→1, next_attempt_at is in the future, last_status_code is
/// the 500, last_error is populated.
#[tokio::test]
async fn worker_marks_failure_and_schedules_retry_on_500() {
let (state, _tmp) = make_state().await;
let url = spawn_500_receiver().await;
let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 0).await;
webhooks::tick(&state).await.expect("tick");
let row: (i64, Option<String>, Option<i64>, Option<String>, Option<String>) =
sqlx::query_as(
"SELECT attempt_count, next_attempt_at, last_status_code, \
last_error, delivered_at FROM webhook_deliveries WHERE id = ?",
)
.bind(&delivery_id)
.fetch_one(&state.db)
.await
.unwrap();
assert_eq!(row.0, 1, "attempt_count should be 1 after one failed tick");
assert!(
row.1.is_some(),
"next_attempt_at should be scheduled for retry"
);
assert_eq!(
row.2,
Some(500),
"last_status_code should record the receiver's 500"
);
assert!(
row.3.as_deref().unwrap_or("").contains("non-2xx"),
"last_error should describe the failure: {:?}",
row.3
);
assert!(row.4.is_none(), "delivered_at must remain NULL on failure");
}
/// Crossing the dead-letter boundary: with attempt_count already at 9,
/// one more failed tick takes it to 10, and the worker MUST NOT
/// schedule another retry — it sets next_attempt_at = NULL. This is
/// the row that the new admin DLQ surface (`?status=failed`) picks up.
#[tokio::test]
async fn worker_dead_letters_after_max_attempts() {
let (state, _tmp) = make_state().await;
let url = spawn_500_receiver().await;
let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 9).await;
webhooks::tick(&state).await.expect("tick");
let row: (i64, Option<String>, Option<String>) = sqlx::query_as(
"SELECT attempt_count, next_attempt_at, delivered_at \
FROM webhook_deliveries WHERE id = ?",
)
.bind(&delivery_id)
.fetch_one(&state.db)
.await
.unwrap();
assert_eq!(row.0, 10, "attempt_count should reach the cap");
assert!(
row.1.is_none(),
"next_attempt_at MUST be NULL — this is the DLQ signal: {:?}",
row.1
);
assert!(row.2.is_none(), "delivered_at must remain NULL");
// Confirm the dead-lettered row also shows up in the admin DLQ
// filter — the SQL predicate the admin endpoint uses
// (delivered_at IS NULL AND next_attempt_at IS NULL AND
// attempt_count > 0) must match this row.
let dlq_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM webhook_deliveries \
WHERE delivered_at IS NULL AND next_attempt_at IS NULL AND attempt_count > 0",
)
.fetch_one(&state.db)
.await
.unwrap();
assert_eq!(
dlq_count, 1,
"the dead-lettered row must satisfy the admin DLQ predicate"
);
}
/// 2xx response → success. The worker stamps `delivered_at` with the
/// current time, leaves `next_attempt_at` NULL, and records the status
/// code. This is the happy path — implicitly tested already via
/// production usage but pinned here for completeness alongside the
/// failure cases above.
#[tokio::test]
async fn worker_marks_success_on_2xx() {
let (state, _tmp) = make_state().await;
// Receiver that always returns 200.
let app = Router::new().route("/", any(|| async { StatusCode::OK }));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}/");
let delivery_id = seed_endpoint_and_delivery(&state.db, &url, 0).await;
webhooks::tick(&state).await.expect("tick");
let row: (i64, Option<String>, Option<i64>, Option<String>) = sqlx::query_as(
"SELECT attempt_count, next_attempt_at, last_status_code, delivered_at \
FROM webhook_deliveries WHERE id = ?",
)
.bind(&delivery_id)
.fetch_one(&state.db)
.await
.unwrap();
assert_eq!(row.0, 1);
assert!(row.1.is_none(), "next_attempt_at should be NULL on success");
assert_eq!(row.2, Some(200));
assert!(row.3.is_some(), "delivered_at should be stamped on success");
}