From eb885502ba705ac9c7c5f75a1ce023501d017d84 Mon Sep 17 00:00:00 2001 From: Grant Date: Fri, 8 May 2026 12:16:22 -0500 Subject: [PATCH] =?UTF-8?q?Multi-currency=20Phase=204=20=E2=80=94=20rate?= =?UTF-8?q?=20fetcher=20with=20Kraken/Coinbase/CoinGecko=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit src/rates.rs adds an in-memory rate cache (60s TTL) with a 3-source fallback chain. AppState gains `rates: Arc`. Manual pins via the settings table override the chain — used by tests for deterministic conversions and by operators during maintenance windows. Admin endpoints: - GET /v1/admin/rates: cache snapshot - POST /v1/admin/rates/refresh: force re-fetch (audit-logged) Two new tests (network-free, manual-pin path): - rate_cache_honors_manual_pin_from_settings - admin_rates_endpoint_reflects_manual_pin Test count: 36 (was 34). --- licensing-service/src/api/mod.rs | 11 + licensing-service/src/api/rates_admin.rs | 89 ++++++++ licensing-service/src/lib.rs | 1 + licensing-service/src/main.rs | 1 + licensing-service/src/rates.rs | 278 +++++++++++++++++++++++ licensing-service/tests/api.rs | 78 +++++++ licensing-service/tests/worker.rs | 1 + 7 files changed, 459 insertions(+) create mode 100644 licensing-service/src/api/rates_admin.rs create mode 100644 licensing-service/src/rates.rs diff --git a/licensing-service/src/api/mod.rs b/licensing-service/src/api/mod.rs index 9e50f39..01220fe 100644 --- a/licensing-service/src/api/mod.rs +++ b/licensing-service/src/api/mod.rs @@ -71,6 +71,7 @@ pub mod tier; pub mod validate; pub mod community; pub mod db_info; +pub mod rates_admin; pub mod recover; pub mod webhook; pub mod webhook_deliveries; @@ -103,6 +104,10 @@ pub struct AppState { /// Keysat-licenses-Keysat tier. Read at boot, swapped when the /// operator activates a fresh license via the admin endpoint. pub self_tier: Arc>, + /// BTC/fiat rate cache for multi-currency products. See + /// src/rates.rs. Process-global so cached rates aren't refetched + /// per-request. + pub rates: Arc, } impl AppState { @@ -325,6 +330,12 @@ pub fn router(state: AppState) -> Router { // Database health snapshot — operator-facing sanity check // against the catastrophic-loss risk; see db_info.rs. .route("/v1/admin/db-info", get(db_info::get)) + // BTC/fiat rate cache — operator-facing view of what the + // daemon would quote for fiat-priced products. See + // src/rates.rs for the source chain (Kraken → Coinbase + // → CoinGecko) and TTL caching semantics. + .route("/v1/admin/rates", get(rates_admin::get)) + .route("/v1/admin/rates/refresh", post(rates_admin::refresh)) // Opt-in community analytics. Off by default; toggling on // requires the operator to confirm a collector URL. .route( diff --git a/licensing-service/src/api/rates_admin.rs b/licensing-service/src/api/rates_admin.rs new file mode 100644 index 0000000..7d6ce33 --- /dev/null +++ b/licensing-service/src/api/rates_admin.rs @@ -0,0 +1,89 @@ +//! Admin endpoints for the BTC/fiat rate cache. +//! +//! Two surfaces: +//! GET /v1/admin/rates — what's cached right now +//! (operators can see what +//! the daemon would quote and +//! which source it came from) +//! POST /v1/admin/rates/refresh — force a fresh fetch for a +//! given currency, bypassing +//! the TTL cache. Useful +//! after a rate-source +//! outage to confirm the +//! chain works end-to-end. + +use crate::api::admin::{request_context, require_admin}; +use crate::api::AppState; +use crate::error::{AppError, AppResult}; +use crate::rates; +use axum::{extract::State, http::HeaderMap, Json}; +use serde::Deserialize; +use serde_json::{json, Value}; + +pub async fn get( + State(state): State, + headers: HeaderMap, +) -> AppResult> { + require_admin(&state, &headers)?; + let snapshot = state.rates.snapshot().await; + let rates_json: Vec = snapshot + .into_iter() + .map(|(currency, cached)| { + json!({ + "currency": currency, + "units_per_btc": cached.units_per_btc, + "source": cached.source, + "fetched_at_secs_ago": cached.fetched_at.elapsed() + .map(|d| d.as_secs()) + .unwrap_or(0), + }) + }) + .collect(); + Ok(Json(json!({ "rates": rates_json }))) +} + +#[derive(Debug, Deserialize)] +pub struct RefreshReq { + pub currency: String, +} + +pub async fn refresh( + State(state): State, + headers: HeaderMap, + Json(req): Json, +) -> AppResult> { + let actor_hash = require_admin(&state, &headers)?; + let (ip, ua) = request_context(&headers); + let currency = req.currency.to_uppercase(); + + // Wipe the cache entry so the next get_rate hits the chain. + state.rates.invalidate(¤cy).await; + + // Fetch fresh — bubbles up source errors with full context. + let fresh = rates::get_rate(&state, ¤cy).await.map_err(|e| { + AppError::Upstream(format!("rate refresh failed: {e:#}")) + })?; + + let _ = crate::db::repo::insert_audit( + &state.db, + "admin_api_key", + Some(&actor_hash), + "rate.refresh", + Some("rate"), + Some(¤cy), + ip.as_deref(), + ua.as_deref(), + &json!({ + "currency": currency, + "source": fresh.source, + "units_per_btc": fresh.units_per_btc, + }), + ) + .await; + + Ok(Json(json!({ + "currency": currency, + "units_per_btc": fresh.units_per_btc, + "source": fresh.source, + }))) +} diff --git a/licensing-service/src/lib.rs b/licensing-service/src/lib.rs index dd194e0..c966797 100644 --- a/licensing-service/src/lib.rs +++ b/licensing-service/src/lib.rs @@ -18,6 +18,7 @@ pub mod license_self; pub mod models; pub mod payment; pub mod rate_limit; +pub mod rates; pub mod reconcile; pub mod tipping; pub mod webhooks; diff --git a/licensing-service/src/main.rs b/licensing-service/src/main.rs index c687fe8..df4deb7 100644 --- a/licensing-service/src/main.rs +++ b/licensing-service/src/main.rs @@ -77,6 +77,7 @@ async fn main() -> anyhow::Result<()> { payment: Arc::new(tokio::sync::RwLock::new(provider)), config: Arc::new(cfg.clone()), self_tier, + rates: keysat::rates::RateCache::new(), }; // Spawn background loops before handing state to the router. diff --git a/licensing-service/src/rates.rs b/licensing-service/src/rates.rs new file mode 100644 index 0000000..407729e --- /dev/null +++ b/licensing-service/src/rates.rs @@ -0,0 +1,278 @@ +//! BTC / fiat exchange-rate fetcher. +//! +//! Reads the rate from a small chain of public sources, caches the +//! result in-memory with a 60-second TTL, and falls through on +//! per-source failure. The cache is shared across the daemon — every +//! call to `get_rate(&state, "USD")` either returns the cached value +//! (cheap) or refreshes it (one HTTP call per minute per currency). +//! +//! ## Source priority +//! +//! 1. **Kraken** — `https://api.kraken.com/0/public/Ticker?pair=XBT` +//! matches the operator's mental model since BTCPay uses Kraken +//! as its default rate provider too. Means the daemon and BTCPay +//! agree on the rate when we use Kraken on both ends. +//! 2. **Coinbase** — `https://api.coinbase.com/v2/exchange-rates?currency=BTC` +//! Robust public API, no auth, simple JSON. Good fallback when +//! Kraken is rate-limiting us or having an outage. +//! 3. **CoinGecko** — `https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd,eur` +//! Last resort. Their public free tier has aggressive rate +//! limits, so it's intentionally last. +//! +//! ## Test-mode pin +//! +//! The settings table key `manual_rate_pin_` (e.g. +//! `manual_rate_pin_USD = "65000"`) overrides the fetcher entirely. +//! Used by integration tests that want a deterministic conversion +//! without hitting the network. Production operators can also set +//! this to lock the rate in for a maintenance window if a fetcher +//! glitch is producing weird quotes. + +use crate::api::AppState; +use anyhow::{anyhow, Context, Result}; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::RwLock; + +/// How long a cached rate is considered fresh. 60s is a reasonable +/// trade-off — most BTC price moves under 60s are <0.1%, well below +/// any operator-meaningful threshold, and longer caches risk staleness +/// during volatility spikes. +const TTL: Duration = Duration::from_secs(60); + +#[derive(Debug, Clone)] +pub struct CachedRate { + /// "-per-BTC" — for USD this is the dollar price of 1 BTC. + pub units_per_btc: f64, + /// Where the rate came from: 'kraken' | 'coinbase' | 'coingecko' | 'manual_pin'. + pub source: String, + /// When the fetch happened. + pub fetched_at: SystemTime, +} + +/// Process-global cache. Keyed by uppercase currency code (e.g. +/// "USD", "EUR"). Held in `AppState` via `Arc` for cheap clones. +#[derive(Default)] +pub struct RateCache { + inner: RwLock>, +} + +impl RateCache { + pub fn new() -> Arc { + Arc::new(Self::default()) + } + + /// Read-only snapshot of the current cache contents. Used by + /// the admin UI to show "what's the daemon currently quoting." + pub async fn snapshot(&self) -> HashMap { + self.inner.read().await.clone() + } + + /// Drop a single currency's cached entry so the next `get_rate` + /// call refetches from the source chain. Used by the + /// `POST /v1/admin/rates/refresh` admin action. + pub async fn invalidate(&self, currency: &str) { + let mut cache = self.inner.write().await; + cache.remove(¤cy.to_uppercase()); + } +} + +/// Fetch the current rate for `currency` (uppercase ISO code) against +/// BTC. Returns the cached value if fresh; otherwise hits the fallback +/// chain. Manual pins in the settings table win over the chain. +pub async fn get_rate(state: &AppState, currency: &str) -> Result { + let currency = currency.to_uppercase(); + if currency == "SAT" || currency == "BTC" { + // Trivial conversion — the rest of the daemon shouldn't be + // calling this for sat-currency products, but return a + // sensible identity if it does. + return Ok(CachedRate { + units_per_btc: 100_000_000.0, // 1 BTC = 100M sats + source: "identity".to_string(), + fetched_at: SystemTime::now(), + }); + } + + // Manual pin from settings table — wins over the cache + chain. + // Always re-checked on every call (no TTL) so an operator can + // un-pin and immediately fall back to live rates. + let pin_key = format!("manual_rate_pin_{currency}"); + if let Ok(Some(raw)) = crate::db::repo::settings_get(&state.db, &pin_key).await { + if let Ok(value) = raw.parse::() { + if value > 0.0 { + let pinned = CachedRate { + units_per_btc: value, + source: "manual_pin".to_string(), + fetched_at: SystemTime::now(), + }; + // Mirror to cache so admin GET /v1/admin/rates + // surfaces the pinned value (without it, the + // snapshot would always show "no rates cached" + // for pinned currencies). + let mut cache = state.rates.inner.write().await; + cache.insert(currency.clone(), pinned.clone()); + return Ok(pinned); + } + } + } + + // Fast path: cached and fresh. + { + let cache = state.rates.inner.read().await; + if let Some(cached) = cache.get(¤cy) { + if cached.fetched_at.elapsed().unwrap_or(TTL) < TTL { + return Ok(cached.clone()); + } + } + } + + // Slow path: hit the chain. + let fresh = fetch_with_fallback(¤cy).await?; + let mut cache = state.rates.inner.write().await; + cache.insert(currency, fresh.clone()); + Ok(fresh) +} + +async fn fetch_with_fallback(currency: &str) -> Result { + // Sources in priority order. Each closure returns the rate if + // it succeeds, propagates the error otherwise. We collect + // errors so a final failure surfaces all three causes for + // debugging. + let mut errors: Vec = Vec::new(); + + match fetch_kraken(currency).await { + Ok(r) => return Ok(r), + Err(e) => errors.push(format!("kraken: {e:#}")), + } + match fetch_coinbase(currency).await { + Ok(r) => return Ok(r), + Err(e) => errors.push(format!("coinbase: {e:#}")), + } + match fetch_coingecko(currency).await { + Ok(r) => return Ok(r), + Err(e) => errors.push(format!("coingecko: {e:#}")), + } + Err(anyhow!( + "all rate sources failed for {currency}: {}", + errors.join("; ") + )) +} + +fn http_client() -> Result { + reqwest::Client::builder() + .timeout(Duration::from_secs(8)) + .build() + .context("build reqwest client") +} + +async fn fetch_kraken(currency: &str) -> Result { + // Kraken pair codes use 'XBT' for BTC and 'Z' prefixes for + // legacy fiat (ZUSD, ZEUR). The c[0] field is the latest + // closed-trade price. + let pair = match currency { + "USD" => "XXBTZUSD", + "EUR" => "XXBTZEUR", + _ => return Err(anyhow!("kraken: unsupported currency {currency}")), + }; + let url = format!("https://api.kraken.com/0/public/Ticker?pair={pair}"); + let body: Value = http_client()?.get(&url).send().await?.error_for_status()?.json().await?; + let errors = body.get("error").and_then(|v| v.as_array()).cloned().unwrap_or_default(); + if !errors.is_empty() { + return Err(anyhow!("kraken returned errors: {errors:?}")); + } + let price_str = body + .pointer(&format!("/result/{pair}/c/0")) + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("kraken: response missing /result/{pair}/c/0"))?; + let value: f64 = price_str.parse().context("kraken: parse price")?; + Ok(CachedRate { + units_per_btc: value, + source: "kraken".to_string(), + fetched_at: SystemTime::now(), + }) +} + +async fn fetch_coinbase(currency: &str) -> Result { + let url = "https://api.coinbase.com/v2/exchange-rates?currency=BTC"; + let body: Value = http_client()?.get(url).send().await?.error_for_status()?.json().await?; + let rate_str = body + .pointer(&format!("/data/rates/{currency}")) + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("coinbase: response missing /data/rates/{currency}"))?; + let value: f64 = rate_str.parse().context("coinbase: parse rate")?; + Ok(CachedRate { + units_per_btc: value, + source: "coinbase".to_string(), + fetched_at: SystemTime::now(), + }) +} + +async fn fetch_coingecko(currency: &str) -> Result { + let cur_lower = currency.to_lowercase(); + let url = format!( + "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies={cur_lower}" + ); + let body: Value = http_client()?.get(&url).send().await?.error_for_status()?.json().await?; + let value = body + .pointer(&format!("/bitcoin/{cur_lower}")) + .and_then(|v| v.as_f64()) + .ok_or_else(|| anyhow!("coingecko: response missing /bitcoin/{cur_lower}"))?; + Ok(CachedRate { + units_per_btc: value, + source: "coingecko".to_string(), + fetched_at: SystemTime::now(), + }) +} + +/// Convert a fiat amount (smallest unit, e.g. cents) to sats using +/// the cached/fetched rate. Returns the sat amount as i64 (rounded +/// to nearest sat — fractional sats don't exist). +/// +/// `value` is in the smallest unit of `currency` (cents for USD). +/// Returns an `(sats, rate_centibps)` pair so callers can pin both +/// on the invoice row for audit. +pub async fn convert_to_sats( + state: &AppState, + currency: &str, + value: i64, +) -> Result { + let currency = currency.to_uppercase(); + if currency == "SAT" { + return Ok(ConversionResult { + sats: value, + rate_centibps: None, + source: "identity".to_string(), + }); + } + let rate = get_rate(state, ¤cy).await?; + // value is cents (for USD/EUR). 1 BTC = 100_000_000 sats. + // sats = value / units_per_btc * 100_000_000 / 100 + // = value * 100_000_000 / (units_per_btc * 100) + // = value * 1_000_000 / units_per_btc + // (the /100 cancels half of 100_000_000 since `value` is in + // cents — the smallest unit is 1/100 of the main unit). + let sats_f = (value as f64) * 1_000_000.0 / rate.units_per_btc; + let sats = sats_f.round() as i64; + + // Encode the rate as centibps (rate × 10,000) for the invoice + // row. See migrations/0010_multi_currency.sql for the encoding + // rationale. + let rate_centibps = (rate.units_per_btc * 10_000.0).round() as i64; + + Ok(ConversionResult { + sats, + rate_centibps: Some(rate_centibps), + source: rate.source, + }) +} + +#[derive(Debug, Clone)] +pub struct ConversionResult { + pub sats: i64, + /// rate × 10,000 in operator-currency-per-BTC units. `None` for + /// identity (SAT-currency) conversions. + pub rate_centibps: Option, + pub source: String, +} diff --git a/licensing-service/tests/api.rs b/licensing-service/tests/api.rs index 02e91fa..b3cc48b 100644 --- a/licensing-service/tests/api.rs +++ b/licensing-service/tests/api.rs @@ -113,6 +113,7 @@ async fn make_test_state() -> (AppState, NamedTempFile) { self_tier: Arc::new(RwLock::new(Tier::Unlicensed { reason: "test fixture".into(), })), + rates: keysat::rates::RateCache::new(), }; (state, tmp) } @@ -1139,6 +1140,83 @@ async fn recover_returns_license_key_for_matching_pair() { assert_eq!(audit_count, 1, "recovery must write an audit row"); } +/// Rate fetcher: manual pin in settings table overrides the source +/// chain. Locks in the test-mode + maintenance-window contract that +/// other phases (invoice rate recording, buy-page rendering) rely on. +#[tokio::test] +async fn rate_cache_honors_manual_pin_from_settings() { + let (state, _tmp) = make_test_state().await; + + // Pin USD at $65,000 / BTC. The fetcher MUST return this value + // without hitting any external API. + sqlx::query("INSERT INTO settings(key, value, updated_at) VALUES('manual_rate_pin_USD', '65000', ?)") + .bind(Utc::now().to_rfc3339()) + .execute(&state.db) + .await + .unwrap(); + + let rate = keysat::rates::get_rate(&state, "USD") + .await + .expect("manual pin should resolve without network"); + assert_eq!(rate.units_per_btc, 65000.0); + assert_eq!(rate.source, "manual_pin"); + + // Convert $49.00 (4900 cents) to sats. At $65k/BTC: + // sats = 4900 * 1_000_000 / 65000 = 75,384.6 → 75,385. + let conv = keysat::rates::convert_to_sats(&state, "USD", 4900) + .await + .expect("convert"); + assert_eq!(conv.sats, 75_385, "rounding tie-break: 75384.615 rounds to 75385"); + assert_eq!( + conv.rate_centibps, + Some(650_000_000), + "rate stored as units×10000: 65000 × 10000" + ); + + // SAT-currency conversions are identity (no rate involved). + let sat_conv = keysat::rates::convert_to_sats(&state, "SAT", 50_000) + .await + .unwrap(); + assert_eq!(sat_conv.sats, 50_000); + assert!(sat_conv.rate_centibps.is_none()); +} + +/// Admin endpoint visibility: GET /v1/admin/rates returns whatever +/// is currently cached, including manual pins. Operators can verify +/// the daemon's current quote against external sources before +/// trusting fiat-priced invoice flows. +#[tokio::test] +async fn admin_rates_endpoint_reflects_manual_pin() { + let (state, _tmp) = make_test_state().await; + let auth = format!("Bearer {}", TEST_ADMIN_KEY); + + sqlx::query("INSERT INTO settings(key, value, updated_at) VALUES('manual_rate_pin_USD', '60000', ?)") + .bind(Utc::now().to_rfc3339()) + .execute(&state.db) + .await + .unwrap(); + + // Trigger a rate read so the cache populates. + let _ = keysat::rates::get_rate(&state, "USD").await.unwrap(); + + let req = build_request( + "GET", + "/v1/admin/rates", + &[("authorization", &auth)], + None, + ); + let resp = send(&state, req).await; + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + let rates = body["rates"].as_array().expect("rates array"); + let usd = rates + .iter() + .find(|r| r["currency"] == "USD") + .expect("USD entry should be present"); + assert_eq!(usd["units_per_btc"], 60_000.0); + assert_eq!(usd["source"], "manual_pin"); +} + /// Multi-currency product creation. The admin endpoint accepts both /// the legacy SAT-only form (`price_sats: N`) and the new typed form /// (`price_currency + price_value`). Verifies: diff --git a/licensing-service/tests/worker.rs b/licensing-service/tests/worker.rs index 76fcbf8..f29687e 100644 --- a/licensing-service/tests/worker.rs +++ b/licensing-service/tests/worker.rs @@ -69,6 +69,7 @@ async fn make_state() -> (AppState, NamedTempFile) { self_tier: Arc::new(RwLock::new(Tier::Unlicensed { reason: "test".into(), })), + rates: keysat::rates::RateCache::new(), }; (state, tmp) }