Webhook DLQ — list failed deliveries and manually retry
Closes the silent-loss hole in outbound webhook delivery. The worker
in src/webhooks.rs retries failed deliveries with exponential backoff
up to 10 attempts, then sets next_attempt_at = NULL and walks away.
Pre-this-commit, those "dead-lettered" rows sat in webhook_deliveries
forever with no surface for the operator to discover, inspect, or
recover from them — a subscriber that was down for >6h during a
license-issuance burst would silently lose those events forever.
What's new:
- repo::DeliveryStatusFilter — enum with parse() so query strings
map cleanly to SQL predicates.
- repo::list_deliveries — endpoint_id + status + limit, newest first.
- repo::requeue_delivery — resets attempt_count=0, clears delivered_at
and last_error, sets next_attempt_at=now. The worker picks it up on
the next 5s tick.
- src/api/webhook_deliveries.rs — admin module with two handlers:
- GET /v1/admin/webhook-deliveries?endpoint_id=…&status=…&limit=…
- POST /v1/admin/webhook-deliveries/:id/retry (audit-logged as
webhook_delivery.retry; 404 on missing id)
- Routes registered in src/api/mod.rs alongside the existing
webhook_endpoints CRUD.
- tests/api.rs gains webhook_dlq_lists_failed_and_retry_requeues:
seeds three deliveries directly via SQL (one each: delivered,
pending, dead-lettered), exercises the list filter, runs the retry,
asserts the row migrates from failed→pending, audit row is written,
404 on bad id, 400 on bad status filter.
Worker code is unchanged. The DLQ is operator-actionable infrastructure
on top of the existing retry semantics.
Test count: 23 (9 unit + 4 migration + 10 API), up from 22.
This commit is contained in:
@@ -70,6 +70,7 @@ pub mod session_layer;
|
|||||||
pub mod tier;
|
pub mod tier;
|
||||||
pub mod validate;
|
pub mod validate;
|
||||||
pub mod webhook;
|
pub mod webhook;
|
||||||
|
pub mod webhook_deliveries;
|
||||||
pub mod webhook_endpoints;
|
pub mod webhook_endpoints;
|
||||||
|
|
||||||
use crate::btcpay::client::BtcpayClient;
|
use crate::btcpay::client::BtcpayClient;
|
||||||
@@ -304,6 +305,16 @@ pub fn router(state: AppState) -> Router {
|
|||||||
"/v1/admin/webhook-endpoints/:id",
|
"/v1/admin/webhook-endpoints/:id",
|
||||||
axum::routing::delete(webhook_endpoints::delete),
|
axum::routing::delete(webhook_endpoints::delete),
|
||||||
)
|
)
|
||||||
|
// Webhook delivery history (the dead-letter inspection +
|
||||||
|
// manual-retry surface; see webhook_deliveries.rs for why).
|
||||||
|
.route(
|
||||||
|
"/v1/admin/webhook-deliveries",
|
||||||
|
get(webhook_deliveries::list),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/v1/admin/webhook-deliveries/:id/retry",
|
||||||
|
post(webhook_deliveries::retry),
|
||||||
|
)
|
||||||
// Discount / referral codes.
|
// Discount / referral codes.
|
||||||
.route(
|
.route(
|
||||||
"/v1/admin/discount-codes",
|
"/v1/admin/discount-codes",
|
||||||
|
|||||||
@@ -0,0 +1,104 @@
|
|||||||
|
//! Admin views over the outbound webhook delivery queue.
|
||||||
|
//!
|
||||||
|
//! Companion to `webhook_endpoints.rs`: that module manages the
|
||||||
|
//! configured subscriber URLs; this one exposes the row-level history
|
||||||
|
//! of attempts (success, in-flight retries, dead-lettered failures)
|
||||||
|
//! and lets operators manually re-queue a dead delivery for another
|
||||||
|
//! pass through the worker.
|
||||||
|
//!
|
||||||
|
//! Why this exists: the worker in `crate::webhooks` retries failed
|
||||||
|
//! deliveries with exponential backoff up to 10 attempts, then sets
|
||||||
|
//! `next_attempt_at = NULL` and walks away. Pre-this-module, those
|
||||||
|
//! "dead-lettered" rows were invisible — operators had no surface to
|
||||||
|
//! discover, inspect, or recover from them. A subscriber endpoint
|
||||||
|
//! that was down for >6h during a license-issuance burst would
|
||||||
|
//! silently lose those events forever.
|
||||||
|
|
||||||
|
use crate::api::admin::{request_context, require_admin};
|
||||||
|
use crate::api::AppState;
|
||||||
|
use crate::db::repo::{self, DeliveryStatusFilter};
|
||||||
|
use crate::error::{AppError, AppResult};
|
||||||
|
use axum::{
|
||||||
|
extract::{Path, Query, State},
|
||||||
|
http::HeaderMap,
|
||||||
|
Json,
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
|
const DEFAULT_LIMIT: i64 = 100;
|
||||||
|
const MAX_LIMIT: i64 = 500;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ListDeliveriesQuery {
|
||||||
|
/// Filter by configured endpoint id. Omit for all endpoints.
|
||||||
|
pub endpoint_id: Option<String>,
|
||||||
|
/// One of `pending` | `delivered` | `failed` | `all`. Defaults to
|
||||||
|
/// `all`. The `failed` filter is the dead-letter queue — rows
|
||||||
|
/// where the worker exhausted retries.
|
||||||
|
pub status: Option<String>,
|
||||||
|
/// Cap on rows returned. Defaults to 100; max 500.
|
||||||
|
pub limit: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
Query(q): Query<ListDeliveriesQuery>,
|
||||||
|
) -> AppResult<Json<Value>> {
|
||||||
|
require_admin(&state, &headers)?;
|
||||||
|
let status = match q.status.as_deref() {
|
||||||
|
Some(s) => DeliveryStatusFilter::parse(s).ok_or_else(|| {
|
||||||
|
AppError::BadRequest(format!(
|
||||||
|
"invalid status filter '{s}'; expected pending|delivered|failed|all"
|
||||||
|
))
|
||||||
|
})?,
|
||||||
|
None => DeliveryStatusFilter::All,
|
||||||
|
};
|
||||||
|
let limit = q
|
||||||
|
.limit
|
||||||
|
.unwrap_or(DEFAULT_LIMIT)
|
||||||
|
.clamp(1, MAX_LIMIT);
|
||||||
|
let rows = repo::list_deliveries(
|
||||||
|
&state.db,
|
||||||
|
q.endpoint_id.as_deref(),
|
||||||
|
status,
|
||||||
|
limit,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(Json(json!({ "deliveries": rows })))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Manual re-queue for a dead-lettered (or otherwise stuck)
|
||||||
|
/// delivery. The worker will pick it up on the next 5s tick.
|
||||||
|
///
|
||||||
|
/// 404 if the delivery id doesn't exist; 200 on success with the
|
||||||
|
/// updated row in the body so the SPA can re-render the list with
|
||||||
|
/// the new state immediately.
|
||||||
|
pub async fn retry(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
Path(id): Path<String>,
|
||||||
|
) -> AppResult<Json<Value>> {
|
||||||
|
let actor_hash = require_admin(&state, &headers)?;
|
||||||
|
let (ip, ua) = request_context(&headers);
|
||||||
|
let delivery = repo::requeue_delivery(&state.db, &id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| AppError::NotFound(format!("webhook delivery '{id}'")))?;
|
||||||
|
let _ = repo::insert_audit(
|
||||||
|
&state.db,
|
||||||
|
"admin_api_key",
|
||||||
|
Some(&actor_hash),
|
||||||
|
"webhook_delivery.retry",
|
||||||
|
Some("webhook_delivery"),
|
||||||
|
Some(&id),
|
||||||
|
ip.as_deref(),
|
||||||
|
ua.as_deref(),
|
||||||
|
&json!({
|
||||||
|
"endpoint_id": delivery.endpoint_id,
|
||||||
|
"event_type": delivery.event_type,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
Ok(Json(json!(delivery)))
|
||||||
|
}
|
||||||
@@ -1379,6 +1379,114 @@ pub async fn mark_delivery_failure(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Filter modes for `list_deliveries`. Strings match the values
|
||||||
|
/// accepted by the admin endpoint's `?status=...` query param.
|
||||||
|
pub enum DeliveryStatusFilter {
|
||||||
|
/// `delivered_at IS NULL AND next_attempt_at IS NOT NULL` — in
|
||||||
|
/// the retry queue, will be picked up by the worker on the next
|
||||||
|
/// tick that's past `next_attempt_at`.
|
||||||
|
Pending,
|
||||||
|
/// `delivered_at IS NOT NULL` — successfully delivered.
|
||||||
|
Delivered,
|
||||||
|
/// `delivered_at IS NULL AND next_attempt_at IS NULL AND
|
||||||
|
/// attempt_count > 0` — the dead-letter case. Worker exhausted
|
||||||
|
/// retries (or hit a hard error like a deleted endpoint) and
|
||||||
|
/// won't re-pick it. Operators see these via the admin list and
|
||||||
|
/// can manually re-queue via `requeue_delivery`.
|
||||||
|
Failed,
|
||||||
|
/// All deliveries.
|
||||||
|
All,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DeliveryStatusFilter {
|
||||||
|
pub fn parse(s: &str) -> Option<Self> {
|
||||||
|
match s {
|
||||||
|
"pending" => Some(Self::Pending),
|
||||||
|
"delivered" => Some(Self::Delivered),
|
||||||
|
"failed" => Some(Self::Failed),
|
||||||
|
"all" => Some(Self::All),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List webhook deliveries with optional filtering. Newest first
|
||||||
|
/// (orders by `created_at DESC`) so the admin UI shows recent
|
||||||
|
/// activity at the top.
|
||||||
|
pub async fn list_deliveries(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
endpoint_id: Option<&str>,
|
||||||
|
status: DeliveryStatusFilter,
|
||||||
|
limit: i64,
|
||||||
|
) -> AppResult<Vec<WebhookDelivery>> {
|
||||||
|
let mut sql = String::from(
|
||||||
|
"SELECT id, endpoint_id, event_type, payload_json, attempt_count,
|
||||||
|
next_attempt_at, last_status_code, last_error, delivered_at, created_at
|
||||||
|
FROM webhook_deliveries WHERE 1=1",
|
||||||
|
);
|
||||||
|
if endpoint_id.is_some() {
|
||||||
|
sql.push_str(" AND endpoint_id = ?");
|
||||||
|
}
|
||||||
|
match status {
|
||||||
|
DeliveryStatusFilter::Pending => {
|
||||||
|
sql.push_str(" AND delivered_at IS NULL AND next_attempt_at IS NOT NULL")
|
||||||
|
}
|
||||||
|
DeliveryStatusFilter::Delivered => sql.push_str(" AND delivered_at IS NOT NULL"),
|
||||||
|
DeliveryStatusFilter::Failed => sql.push_str(
|
||||||
|
" AND delivered_at IS NULL AND next_attempt_at IS NULL AND attempt_count > 0",
|
||||||
|
),
|
||||||
|
DeliveryStatusFilter::All => {}
|
||||||
|
}
|
||||||
|
sql.push_str(" ORDER BY created_at DESC LIMIT ?");
|
||||||
|
|
||||||
|
let mut q = sqlx::query(&sql);
|
||||||
|
if let Some(eid) = endpoint_id {
|
||||||
|
q = q.bind(eid);
|
||||||
|
}
|
||||||
|
q = q.bind(limit);
|
||||||
|
let rows = q.fetch_all(pool).await?;
|
||||||
|
Ok(rows.into_iter().map(row_to_delivery).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-queue a previously-failed (or even successfully-delivered)
|
||||||
|
/// delivery for another attempt. Resets `attempt_count` to 0, clears
|
||||||
|
/// `delivered_at` and `last_error`, and sets `next_attempt_at` to
|
||||||
|
/// now so the worker picks it up on the next tick.
|
||||||
|
///
|
||||||
|
/// Returns the affected row, or `Ok(None)` if no row with the given
|
||||||
|
/// id exists.
|
||||||
|
pub async fn requeue_delivery(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
id: &str,
|
||||||
|
) -> AppResult<Option<WebhookDelivery>> {
|
||||||
|
let now = Utc::now().to_rfc3339();
|
||||||
|
let res = sqlx::query(
|
||||||
|
"UPDATE webhook_deliveries
|
||||||
|
SET attempt_count = 0,
|
||||||
|
delivered_at = NULL,
|
||||||
|
last_error = NULL,
|
||||||
|
last_status_code = NULL,
|
||||||
|
next_attempt_at = ?
|
||||||
|
WHERE id = ?",
|
||||||
|
)
|
||||||
|
.bind(&now)
|
||||||
|
.bind(id)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
if res.rows_affected() == 0 {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let row = sqlx::query(
|
||||||
|
"SELECT id, endpoint_id, event_type, payload_json, attempt_count,
|
||||||
|
next_attempt_at, last_status_code, last_error, delivered_at, created_at
|
||||||
|
FROM webhook_deliveries WHERE id = ?",
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await?;
|
||||||
|
Ok(Some(row_to_delivery(row)))
|
||||||
|
}
|
||||||
|
|
||||||
fn row_to_delivery(row: sqlx::sqlite::SqliteRow) -> WebhookDelivery {
|
fn row_to_delivery(row: sqlx::sqlite::SqliteRow) -> WebhookDelivery {
|
||||||
WebhookDelivery {
|
WebhookDelivery {
|
||||||
id: row.get("id"),
|
id: row.get("id"),
|
||||||
|
|||||||
@@ -827,3 +827,167 @@ async fn tier_caps_block_at_creator_limit_and_unlock_after_upgrade() {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(count, 6, "the previously-blocked product should now exist");
|
assert_eq!(count, 6, "the previously-blocked product should now exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Webhook DLQ (dead-letter queue) — list + retry round trip.
|
||||||
|
///
|
||||||
|
/// The delivery worker retries failed deliveries with exponential
|
||||||
|
/// backoff up to 10 attempts, then sets `next_attempt_at = NULL` and
|
||||||
|
/// walks away. Pre-this-feature, those rows were invisible to the
|
||||||
|
/// operator. Now `GET /v1/admin/webhook-deliveries?status=failed`
|
||||||
|
/// surfaces them and `POST /v1/admin/webhook-deliveries/:id/retry`
|
||||||
|
/// puts them back in the queue.
|
||||||
|
///
|
||||||
|
/// We seed a "dead-lettered" row directly via SQL — the worker isn't
|
||||||
|
/// spawned in tests, so we don't need to drive 10 real failures to
|
||||||
|
/// reach the dead state. This tests the admin surface, not the
|
||||||
|
/// worker.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn webhook_dlq_lists_failed_and_retry_requeues() {
|
||||||
|
let (state, _tmp) = make_test_state().await;
|
||||||
|
let auth = format!("Bearer {}", TEST_ADMIN_KEY);
|
||||||
|
let now = Utc::now().to_rfc3339();
|
||||||
|
|
||||||
|
// Configure a webhook endpoint to own the deliveries.
|
||||||
|
let endpoint_id = "ep1";
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO webhook_endpoints(id, url, secret, event_types, active, \
|
||||||
|
description, created_at, updated_at) \
|
||||||
|
VALUES(?, 'https://operator.example/keysat-hook', \
|
||||||
|
'0123456789abcdef0123456789abcdef', '[\"*\"]', 1, '', ?, ?)",
|
||||||
|
)
|
||||||
|
.bind(endpoint_id)
|
||||||
|
.bind(&now)
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&state.db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// One delivery in each state: delivered (success), pending
|
||||||
|
// (in-queue), and failed (DLQ — what we mostly care about).
|
||||||
|
let mk = |id: &str, attempts: i64, next: Option<&str>, delivered: Option<&str>| {
|
||||||
|
let id = id.to_string();
|
||||||
|
let attempts = attempts;
|
||||||
|
let next = next.map(|s| s.to_string());
|
||||||
|
let delivered = delivered.map(|s| s.to_string());
|
||||||
|
let pool = state.db.clone();
|
||||||
|
let now = now.clone();
|
||||||
|
async move {
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO webhook_deliveries(id, endpoint_id, event_type, \
|
||||||
|
payload_json, attempt_count, next_attempt_at, delivered_at, created_at) \
|
||||||
|
VALUES(?, ?, 'license.issued', '{}', ?, ?, ?, ?)",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(endpoint_id)
|
||||||
|
.bind(attempts)
|
||||||
|
.bind(next.as_deref())
|
||||||
|
.bind(delivered.as_deref())
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
mk("d-delivered", 1, None, Some(&now)).await;
|
||||||
|
mk("d-pending", 2, Some(&now), None).await;
|
||||||
|
// The dead-lettered case: 10 attempts, next_attempt_at NULL, never delivered.
|
||||||
|
mk("d-failed", 10, None, None).await;
|
||||||
|
|
||||||
|
// List with status=failed should return ONLY the dead-lettered row.
|
||||||
|
let req = build_request(
|
||||||
|
"GET",
|
||||||
|
"/v1/admin/webhook-deliveries?status=failed",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let body = body_json(resp).await;
|
||||||
|
let deliveries = body["deliveries"].as_array().expect("deliveries array");
|
||||||
|
assert_eq!(
|
||||||
|
deliveries.len(),
|
||||||
|
1,
|
||||||
|
"status=failed should return the one DLQ row, got {deliveries:?}"
|
||||||
|
);
|
||||||
|
assert_eq!(deliveries[0]["id"], "d-failed");
|
||||||
|
assert_eq!(deliveries[0]["attempt_count"], 10);
|
||||||
|
|
||||||
|
// Retry the dead-lettered delivery.
|
||||||
|
let req = build_request(
|
||||||
|
"POST",
|
||||||
|
"/v1/admin/webhook-deliveries/d-failed/retry",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK, "retry should succeed");
|
||||||
|
let body = body_json(resp).await;
|
||||||
|
assert_eq!(
|
||||||
|
body["attempt_count"], 0,
|
||||||
|
"retry should reset attempt_count to 0"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body["next_attempt_at"].is_string(),
|
||||||
|
"retry should set next_attempt_at: {body:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// After retry: status=failed should be empty (the row left the
|
||||||
|
// DLQ); status=pending should now contain it.
|
||||||
|
let req = build_request(
|
||||||
|
"GET",
|
||||||
|
"/v1/admin/webhook-deliveries?status=failed",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
let body = body_json(resp).await;
|
||||||
|
assert_eq!(
|
||||||
|
body["deliveries"].as_array().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"after retry, the row should no longer be 'failed'"
|
||||||
|
);
|
||||||
|
|
||||||
|
let req = build_request(
|
||||||
|
"GET",
|
||||||
|
"/v1/admin/webhook-deliveries?status=pending",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
let body = body_json(resp).await;
|
||||||
|
let pending = body["deliveries"].as_array().unwrap();
|
||||||
|
assert!(
|
||||||
|
pending.iter().any(|d| d["id"] == "d-failed"),
|
||||||
|
"after retry, the previously-failed row should appear in 'pending'"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Audit log captured the retry.
|
||||||
|
let audit_count: i64 = sqlx::query_scalar(
|
||||||
|
"SELECT COUNT(*) FROM audit_log WHERE action = 'webhook_delivery.retry' AND target_id = 'd-failed'",
|
||||||
|
)
|
||||||
|
.fetch_one(&state.db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(audit_count, 1, "retry must write an audit log entry");
|
||||||
|
|
||||||
|
// Retry on a non-existent id is 404.
|
||||||
|
let req = build_request(
|
||||||
|
"POST",
|
||||||
|
"/v1/admin/webhook-deliveries/never-existed/retry",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||||
|
|
||||||
|
// Bad status filter is 400 (a typo'd query string shouldn't
|
||||||
|
// silently succeed; that's a UI footgun).
|
||||||
|
let req = build_request(
|
||||||
|
"GET",
|
||||||
|
"/v1/admin/webhook-deliveries?status=garbage",
|
||||||
|
&[("authorization", &auth)],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let resp = send(&state, req).await;
|
||||||
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user