From f9ef1a854caedd529f7c21c4faad9228ee620ecc Mon Sep 17 00:00:00 2001 From: Grant Date: Fri, 8 May 2026 09:38:58 -0500 Subject: [PATCH] =?UTF-8?q?Webhook=20DLQ=20=E2=80=94=20list=20failed=20del?= =?UTF-8?q?iveries=20and=20manually=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the silent-loss hole in outbound webhook delivery. The worker in src/webhooks.rs retries failed deliveries with exponential backoff up to 10 attempts, then sets next_attempt_at = NULL and walks away. Pre-this-commit, those "dead-lettered" rows sat in webhook_deliveries forever with no surface for the operator to discover, inspect, or recover from them — a subscriber that was down for >6h during a license-issuance burst would silently lose those events forever. What's new: - repo::DeliveryStatusFilter — enum with parse() so query strings map cleanly to SQL predicates. - repo::list_deliveries — endpoint_id + status + limit, newest first. - repo::requeue_delivery — resets attempt_count=0, clears delivered_at and last_error, sets next_attempt_at=now. The worker picks it up on the next 5s tick. - src/api/webhook_deliveries.rs — admin module with two handlers: - GET /v1/admin/webhook-deliveries?endpoint_id=…&status=…&limit=… - POST /v1/admin/webhook-deliveries/:id/retry (audit-logged as webhook_delivery.retry; 404 on missing id) - Routes registered in src/api/mod.rs alongside the existing webhook_endpoints CRUD. - tests/api.rs gains webhook_dlq_lists_failed_and_retry_requeues: seeds three deliveries directly via SQL (one each: delivered, pending, dead-lettered), exercises the list filter, runs the retry, asserts the row migrates from failed→pending, audit row is written, 404 on bad id, 400 on bad status filter. Worker code is unchanged. The DLQ is operator-actionable infrastructure on top of the existing retry semantics. Test count: 23 (9 unit + 4 migration + 10 API), up from 22. --- licensing-service/src/api/mod.rs | 11 ++ .../src/api/webhook_deliveries.rs | 104 +++++++++++ licensing-service/src/db/repo.rs | 108 ++++++++++++ licensing-service/tests/api.rs | 164 ++++++++++++++++++ 4 files changed, 387 insertions(+) create mode 100644 licensing-service/src/api/webhook_deliveries.rs diff --git a/licensing-service/src/api/mod.rs b/licensing-service/src/api/mod.rs index 6aaf4aa..4f21969 100644 --- a/licensing-service/src/api/mod.rs +++ b/licensing-service/src/api/mod.rs @@ -70,6 +70,7 @@ pub mod session_layer; pub mod tier; pub mod validate; pub mod webhook; +pub mod webhook_deliveries; pub mod webhook_endpoints; use crate::btcpay::client::BtcpayClient; @@ -304,6 +305,16 @@ pub fn router(state: AppState) -> Router { "/v1/admin/webhook-endpoints/:id", axum::routing::delete(webhook_endpoints::delete), ) + // Webhook delivery history (the dead-letter inspection + + // manual-retry surface; see webhook_deliveries.rs for why). + .route( + "/v1/admin/webhook-deliveries", + get(webhook_deliveries::list), + ) + .route( + "/v1/admin/webhook-deliveries/:id/retry", + post(webhook_deliveries::retry), + ) // Discount / referral codes. .route( "/v1/admin/discount-codes", diff --git a/licensing-service/src/api/webhook_deliveries.rs b/licensing-service/src/api/webhook_deliveries.rs new file mode 100644 index 0000000..5c318b8 --- /dev/null +++ b/licensing-service/src/api/webhook_deliveries.rs @@ -0,0 +1,104 @@ +//! Admin views over the outbound webhook delivery queue. +//! +//! Companion to `webhook_endpoints.rs`: that module manages the +//! configured subscriber URLs; this one exposes the row-level history +//! of attempts (success, in-flight retries, dead-lettered failures) +//! and lets operators manually re-queue a dead delivery for another +//! pass through the worker. +//! +//! Why this exists: the worker in `crate::webhooks` retries failed +//! deliveries with exponential backoff up to 10 attempts, then sets +//! `next_attempt_at = NULL` and walks away. Pre-this-module, those +//! "dead-lettered" rows were invisible — operators had no surface to +//! discover, inspect, or recover from them. A subscriber endpoint +//! that was down for >6h during a license-issuance burst would +//! silently lose those events forever. + +use crate::api::admin::{request_context, require_admin}; +use crate::api::AppState; +use crate::db::repo::{self, DeliveryStatusFilter}; +use crate::error::{AppError, AppResult}; +use axum::{ + extract::{Path, Query, State}, + http::HeaderMap, + Json, +}; +use serde::Deserialize; +use serde_json::{json, Value}; + +const DEFAULT_LIMIT: i64 = 100; +const MAX_LIMIT: i64 = 500; + +#[derive(Debug, Deserialize)] +pub struct ListDeliveriesQuery { + /// Filter by configured endpoint id. Omit for all endpoints. + pub endpoint_id: Option, + /// One of `pending` | `delivered` | `failed` | `all`. Defaults to + /// `all`. The `failed` filter is the dead-letter queue — rows + /// where the worker exhausted retries. + pub status: Option, + /// Cap on rows returned. Defaults to 100; max 500. + pub limit: Option, +} + +pub async fn list( + State(state): State, + headers: HeaderMap, + Query(q): Query, +) -> AppResult> { + require_admin(&state, &headers)?; + let status = match q.status.as_deref() { + Some(s) => DeliveryStatusFilter::parse(s).ok_or_else(|| { + AppError::BadRequest(format!( + "invalid status filter '{s}'; expected pending|delivered|failed|all" + )) + })?, + None => DeliveryStatusFilter::All, + }; + let limit = q + .limit + .unwrap_or(DEFAULT_LIMIT) + .clamp(1, MAX_LIMIT); + let rows = repo::list_deliveries( + &state.db, + q.endpoint_id.as_deref(), + status, + limit, + ) + .await?; + Ok(Json(json!({ "deliveries": rows }))) +} + +/// Manual re-queue for a dead-lettered (or otherwise stuck) +/// delivery. The worker will pick it up on the next 5s tick. +/// +/// 404 if the delivery id doesn't exist; 200 on success with the +/// updated row in the body so the SPA can re-render the list with +/// the new state immediately. +pub async fn retry( + State(state): State, + headers: HeaderMap, + Path(id): Path, +) -> AppResult> { + let actor_hash = require_admin(&state, &headers)?; + let (ip, ua) = request_context(&headers); + let delivery = repo::requeue_delivery(&state.db, &id) + .await? + .ok_or_else(|| AppError::NotFound(format!("webhook delivery '{id}'")))?; + let _ = repo::insert_audit( + &state.db, + "admin_api_key", + Some(&actor_hash), + "webhook_delivery.retry", + Some("webhook_delivery"), + Some(&id), + ip.as_deref(), + ua.as_deref(), + &json!({ + "endpoint_id": delivery.endpoint_id, + "event_type": delivery.event_type, + }), + ) + .await; + Ok(Json(json!(delivery))) +} diff --git a/licensing-service/src/db/repo.rs b/licensing-service/src/db/repo.rs index 087aec2..e124c75 100644 --- a/licensing-service/src/db/repo.rs +++ b/licensing-service/src/db/repo.rs @@ -1379,6 +1379,114 @@ pub async fn mark_delivery_failure( Ok(()) } +/// Filter modes for `list_deliveries`. Strings match the values +/// accepted by the admin endpoint's `?status=...` query param. +pub enum DeliveryStatusFilter { + /// `delivered_at IS NULL AND next_attempt_at IS NOT NULL` — in + /// the retry queue, will be picked up by the worker on the next + /// tick that's past `next_attempt_at`. + Pending, + /// `delivered_at IS NOT NULL` — successfully delivered. + Delivered, + /// `delivered_at IS NULL AND next_attempt_at IS NULL AND + /// attempt_count > 0` — the dead-letter case. Worker exhausted + /// retries (or hit a hard error like a deleted endpoint) and + /// won't re-pick it. Operators see these via the admin list and + /// can manually re-queue via `requeue_delivery`. + Failed, + /// All deliveries. + All, +} + +impl DeliveryStatusFilter { + pub fn parse(s: &str) -> Option { + match s { + "pending" => Some(Self::Pending), + "delivered" => Some(Self::Delivered), + "failed" => Some(Self::Failed), + "all" => Some(Self::All), + _ => None, + } + } +} + +/// List webhook deliveries with optional filtering. Newest first +/// (orders by `created_at DESC`) so the admin UI shows recent +/// activity at the top. +pub async fn list_deliveries( + pool: &SqlitePool, + endpoint_id: Option<&str>, + status: DeliveryStatusFilter, + limit: i64, +) -> AppResult> { + let mut sql = String::from( + "SELECT id, endpoint_id, event_type, payload_json, attempt_count, + next_attempt_at, last_status_code, last_error, delivered_at, created_at + FROM webhook_deliveries WHERE 1=1", + ); + if endpoint_id.is_some() { + sql.push_str(" AND endpoint_id = ?"); + } + match status { + DeliveryStatusFilter::Pending => { + sql.push_str(" AND delivered_at IS NULL AND next_attempt_at IS NOT NULL") + } + DeliveryStatusFilter::Delivered => sql.push_str(" AND delivered_at IS NOT NULL"), + DeliveryStatusFilter::Failed => sql.push_str( + " AND delivered_at IS NULL AND next_attempt_at IS NULL AND attempt_count > 0", + ), + DeliveryStatusFilter::All => {} + } + sql.push_str(" ORDER BY created_at DESC LIMIT ?"); + + let mut q = sqlx::query(&sql); + if let Some(eid) = endpoint_id { + q = q.bind(eid); + } + q = q.bind(limit); + let rows = q.fetch_all(pool).await?; + Ok(rows.into_iter().map(row_to_delivery).collect()) +} + +/// Re-queue a previously-failed (or even successfully-delivered) +/// delivery for another attempt. Resets `attempt_count` to 0, clears +/// `delivered_at` and `last_error`, and sets `next_attempt_at` to +/// now so the worker picks it up on the next tick. +/// +/// Returns the affected row, or `Ok(None)` if no row with the given +/// id exists. +pub async fn requeue_delivery( + pool: &SqlitePool, + id: &str, +) -> AppResult> { + let now = Utc::now().to_rfc3339(); + let res = sqlx::query( + "UPDATE webhook_deliveries + SET attempt_count = 0, + delivered_at = NULL, + last_error = NULL, + last_status_code = NULL, + next_attempt_at = ? + WHERE id = ?", + ) + .bind(&now) + .bind(id) + .execute(pool) + .await?; + if res.rows_affected() == 0 { + return Ok(None); + } + let row = sqlx::query( + "SELECT id, endpoint_id, event_type, payload_json, attempt_count, + next_attempt_at, last_status_code, last_error, delivered_at, created_at + FROM webhook_deliveries WHERE id = ?", + ) + .bind(id) + .fetch_one(pool) + .await?; + Ok(Some(row_to_delivery(row))) +} + fn row_to_delivery(row: sqlx::sqlite::SqliteRow) -> WebhookDelivery { WebhookDelivery { id: row.get("id"), diff --git a/licensing-service/tests/api.rs b/licensing-service/tests/api.rs index 67fbdfa..d227771 100644 --- a/licensing-service/tests/api.rs +++ b/licensing-service/tests/api.rs @@ -827,3 +827,167 @@ async fn tier_caps_block_at_creator_limit_and_unlock_after_upgrade() { .unwrap(); assert_eq!(count, 6, "the previously-blocked product should now exist"); } + +/// Webhook DLQ (dead-letter queue) — list + retry round trip. +/// +/// The delivery worker retries failed deliveries with exponential +/// backoff up to 10 attempts, then sets `next_attempt_at = NULL` and +/// walks away. Pre-this-feature, those rows were invisible to the +/// operator. Now `GET /v1/admin/webhook-deliveries?status=failed` +/// surfaces them and `POST /v1/admin/webhook-deliveries/:id/retry` +/// puts them back in the queue. +/// +/// We seed a "dead-lettered" row directly via SQL — the worker isn't +/// spawned in tests, so we don't need to drive 10 real failures to +/// reach the dead state. This tests the admin surface, not the +/// worker. +#[tokio::test] +async fn webhook_dlq_lists_failed_and_retry_requeues() { + let (state, _tmp) = make_test_state().await; + let auth = format!("Bearer {}", TEST_ADMIN_KEY); + let now = Utc::now().to_rfc3339(); + + // Configure a webhook endpoint to own the deliveries. + let endpoint_id = "ep1"; + sqlx::query( + "INSERT INTO webhook_endpoints(id, url, secret, event_types, active, \ + description, created_at, updated_at) \ + VALUES(?, 'https://operator.example/keysat-hook', \ + '0123456789abcdef0123456789abcdef', '[\"*\"]', 1, '', ?, ?)", + ) + .bind(endpoint_id) + .bind(&now) + .bind(&now) + .execute(&state.db) + .await + .unwrap(); + + // One delivery in each state: delivered (success), pending + // (in-queue), and failed (DLQ — what we mostly care about). + let mk = |id: &str, attempts: i64, next: Option<&str>, delivered: Option<&str>| { + let id = id.to_string(); + let attempts = attempts; + let next = next.map(|s| s.to_string()); + let delivered = delivered.map(|s| s.to_string()); + let pool = state.db.clone(); + let now = now.clone(); + async move { + sqlx::query( + "INSERT INTO webhook_deliveries(id, endpoint_id, event_type, \ + payload_json, attempt_count, next_attempt_at, delivered_at, created_at) \ + VALUES(?, ?, 'license.issued', '{}', ?, ?, ?, ?)", + ) + .bind(&id) + .bind(endpoint_id) + .bind(attempts) + .bind(next.as_deref()) + .bind(delivered.as_deref()) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + }; + mk("d-delivered", 1, None, Some(&now)).await; + mk("d-pending", 2, Some(&now), None).await; + // The dead-lettered case: 10 attempts, next_attempt_at NULL, never delivered. + mk("d-failed", 10, None, None).await; + + // List with status=failed should return ONLY the dead-lettered row. + let req = build_request( + "GET", + "/v1/admin/webhook-deliveries?status=failed", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + let deliveries = body["deliveries"].as_array().expect("deliveries array"); + assert_eq!( + deliveries.len(), + 1, + "status=failed should return the one DLQ row, got {deliveries:?}" + ); + assert_eq!(deliveries[0]["id"], "d-failed"); + assert_eq!(deliveries[0]["attempt_count"], 10); + + // Retry the dead-lettered delivery. + let req = build_request( + "POST", + "/v1/admin/webhook-deliveries/d-failed/retry", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + assert_eq!(resp.status(), StatusCode::OK, "retry should succeed"); + let body = body_json(resp).await; + assert_eq!( + body["attempt_count"], 0, + "retry should reset attempt_count to 0" + ); + assert!( + body["next_attempt_at"].is_string(), + "retry should set next_attempt_at: {body:?}" + ); + + // After retry: status=failed should be empty (the row left the + // DLQ); status=pending should now contain it. + let req = build_request( + "GET", + "/v1/admin/webhook-deliveries?status=failed", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + let body = body_json(resp).await; + assert_eq!( + body["deliveries"].as_array().unwrap().len(), + 0, + "after retry, the row should no longer be 'failed'" + ); + + let req = build_request( + "GET", + "/v1/admin/webhook-deliveries?status=pending", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + let body = body_json(resp).await; + let pending = body["deliveries"].as_array().unwrap(); + assert!( + pending.iter().any(|d| d["id"] == "d-failed"), + "after retry, the previously-failed row should appear in 'pending'" + ); + + // Audit log captured the retry. + let audit_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM audit_log WHERE action = 'webhook_delivery.retry' AND target_id = 'd-failed'", + ) + .fetch_one(&state.db) + .await + .unwrap(); + assert_eq!(audit_count, 1, "retry must write an audit log entry"); + + // Retry on a non-existent id is 404. + let req = build_request( + "POST", + "/v1/admin/webhook-deliveries/never-existed/retry", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + + // Bad status filter is 400 (a typo'd query string shouldn't + // silently succeed; that's a UI footgun). + let req = build_request( + "GET", + "/v1/admin/webhook-deliveries?status=garbage", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +}