Files
keysat/licensing-service/src/reconcile.rs
T
Grant 5ec9a6e8c0 Migrate reconcile + tipping onto PaymentProvider trait; add worker tests
Two compat-path holdovers migrated:

- src/reconcile.rs: was state.btcpay_client().get_invoice() with
  manual JSON parsing of BTCPay-specific status strings ("Settled",
  "Complete", "Expired", "Invalid"). Now state.payment_provider()
  .get_invoice_status() returning the typed ProviderInvoiceStatus
  enum. The string normalization moves into BtcpayProvider's impl
  where it belongs.

- src/tipping.rs: was state.btcpay_client().pay_lightning_invoice()
  returning raw JSON, then manual paymentHash extraction. Now
  provider.pay_lightning_invoice() returning a typed PaymentReceipt
  { payment_hash, raw }. The audit message now records the active
  provider's kind() rather than hardcoding "BTCPay LN node".

Combined with v0.1.0:43's purchase migration, the daemon's
non-test code now contains zero calls to state.btcpay_client() or
.btcpay_webhook_secret(). Those compat accessors stay on AppState
for v0.2 (no need to break things gratuitously) but they're dead
code in the production path. Zaprite's drop-in only needs to
implement the trait.

Worker integration tests (tests/worker.rs):

- worker_marks_failure_and_schedules_retry_on_500: spins up a tiny
  axum receiver that 500s, calls webhooks::tick(), verifies attempt
  count and next-attempt scheduling.
- worker_dead_letters_after_max_attempts: seeds a row at attempt
  count 9, ticks once, verifies attempt_count → 10 and
  next_attempt_at → NULL. Confirms the row also satisfies the admin
  DLQ predicate (the contract :43's webhook_deliveries.rs depends
  on).
- worker_marks_success_on_2xx: pins the happy path.

webhooks::tick is now `pub` so integration tests can drive it
synchronously.

Test count: 26 (9 unit + 4 migration + 10 API + 3 worker).
2026-05-08 10:40:11 -05:00

147 lines
5.4 KiB
Rust

//! Invoice reconciliation background task.
//!
//! Webhooks are the primary signal from BTCPay to us — fast, push-based, and
//! authenticated with HMAC. But webhooks can be dropped (network blips, our
//! service restarting during a burst, BTCPay retry-budget exhaustion on a
//! long outage). If we only ever reacted to webhooks, a dropped settle
//! notification would mean a buyer paid and never got their license.
//!
//! Reconciliation closes that gap. Every N seconds we scan our own table
//! for invoices still in `pending` status that were created recently, ask
//! BTCPay directly what their real state is, and reconcile:
//!
//! - BTCPay says `Settled` → mark settled AND issue a license if one
//! doesn't exist yet (idempotency enforced by the UNIQUE index on
//! `licenses.invoice_id`).
//! - BTCPay says `Expired` / `Invalid` → mark accordingly, don't issue.
//! - BTCPay still says `New` / `Processing` → leave it alone.
//!
//! The task is cheap — one DB query and at most N HTTP calls per tick —
//! and bounded (we only look at invoices younger than MAX_AGE_HOURS).
use crate::api::AppState;
use crate::db::repo;
use std::time::Duration;
use tokio::time::sleep;
const TICK: Duration = Duration::from_secs(60);
const MAX_AGE_HOURS: i64 = 72;
pub fn spawn(state: AppState) {
tokio::spawn(async move {
// Small initial delay so we don't race startup logs.
sleep(Duration::from_secs(15)).await;
loop {
if let Err(e) = tick(&state).await {
tracing::warn!(error = %e, "reconciliation tick failed");
}
sleep(TICK).await;
}
});
}
async fn tick(state: &AppState) -> anyhow::Result<()> {
// Provider-agnostic. Each provider's impl handles the
// provider-specific status-string normalization (BTCPay's
// "Settled"/"Complete"/"Expired"/"Invalid" → ProviderInvoiceStatus
// enum); this loop just operates on the typed result.
let provider = match state.payment_provider().await {
Ok(p) => p,
Err(_) => return Ok(()), // not configured yet — skip silently
};
let pending = repo::list_pending_invoices(&state.db, MAX_AGE_HOURS)
.await
.map_err(|e| anyhow::anyhow!("listing pending invoices: {e:?}"))?;
if pending.is_empty() {
return Ok(());
}
tracing::debug!(count = pending.len(), "reconciling pending invoices");
for inv in pending {
match provider.get_invoice_status(&inv.btcpay_invoice_id).await {
Ok(status) => {
use crate::payment::ProviderInvoiceStatus::*;
let new_status = match status {
Settled => "settled",
Expired => "expired",
Invalid => "invalid",
// Pending stays pending; Refunded is a v0.3 surface
// that the webhook handler also short-circuits on.
Pending | Refunded => continue,
};
if new_status == inv.status.as_str() {
continue; // no-op
}
if let Err(e) = repo::update_invoice_status(
&state.db,
&inv.btcpay_invoice_id,
new_status,
)
.await
{
tracing::warn!(
error = %e,
btcpay_invoice_id = %inv.btcpay_invoice_id,
"reconciler failed to update invoice status"
);
continue;
}
// Free any reserved discount-code slot if the invoice
// entered a terminal failure state.
if matches!(new_status, "expired" | "invalid") {
if let Ok(Some(redemption)) =
repo::get_pending_redemption_by_invoice(&state.db, &inv.id).await
{
let _ = repo::cancel_redemption(&state.db, &redemption.id).await;
}
}
if new_status == "settled" {
if let Err(e) = ensure_license(state, &inv).await {
tracing::warn!(
error = %e,
btcpay_invoice_id = %inv.btcpay_invoice_id,
"reconciler failed to issue license after recovered settle"
);
} else {
tracing::info!(
btcpay_invoice_id = %inv.btcpay_invoice_id,
"reconciler issued license for recovered settled invoice"
);
}
}
}
Err(e) => {
tracing::debug!(
error = %e,
btcpay_invoice_id = %inv.btcpay_invoice_id,
"reconciler failed to fetch invoice from BTCPay"
);
}
}
}
Ok(())
}
async fn ensure_license(
state: &AppState,
invoice: &crate::models::Invoice,
) -> anyhow::Result<()> {
if repo::get_license_by_invoice(&state.db, &invoice.id)
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))?
.is_some()
{
return Ok(());
}
crate::api::webhook::issue_license_for_invoice(state, invoice)
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
Ok(())
}