Dedupe relay response-parsing + operator calls; add relay unit tests

Collapse three byte-identical response-parsing blocks (getRelay, postRelay,
and the tts error path) into one handleRelayResponse helper. pingBalance is
deliberately left out: it records a relay error even on a parsed envelope,
a different contract from the other three (updateRelayState clears lastError
while recordRelayError sets it), so folding it in would change observable
state.

Collapse the six operator-to-relay calls into operatorPost / operatorGet,
preserving their intentional split: writes (tier grant, invoice, order)
throw on misconfig or non-OK so the operator action surfaces the failure;
reads (tier, plans, expiring subs) return null so the caller falls back to
a default. Per-function signatures, body shapes, error messages, and the
throw-vs-null behavior are unchanged.

Add server/test/relay.test.js (first fetch-mock harness for relay.js):
14 tests covering the tts error-path control flow, handleRelayResponse's
envelope parsing and error-recording rule, and the operator throw-vs-null
contract including the missing-config branch. 158 tests pass.

ROADMAP gains the deferred refactor-survey items (subscription engine,
/api/process pipeline, sweep middleware, transcript coalescers) and notes
the relay-test coverage against the existing known-debt entries.
This commit is contained in:
Keysat
2026-06-20 06:07:21 -05:00
parent f2188fa797
commit 890d671bf2
3 changed files with 370 additions and 124 deletions
+87 -122
View File
@@ -111,19 +111,18 @@ export function createRelayProvider({
// Common error-handling wrapper. The relay's contract is that ANY
// response (success or failure) carries the standard envelope so
// Recap can keep its balance display accurate even on errors. We
// try to parse error bodies to harvest that.
// GET wrapper mirroring postRelay's envelope-aware error handling.
// Used by the transcribe-url poll loop to fetch job status.
async function getRelay({ path, headers, signal }) {
let res;
try {
res = await fetch(`${base}${path}`, { method: "GET", headers, signal });
} catch (err) {
recordRelayError(err?.message || String(err), creditKey);
throw err;
}
const text = await res.text();
// Recap can keep its balance display accurate even on errors — we
// always try to parse the body, harvest credit/tier state from it, and
// on a non-OK status throw an Error carrying the parsed envelope +
// status code. `label` is the call name baked into the thrown message
// (e.g. "Relay GET /relay/jobs/x"). recordRelayError fires only when
// there's NO envelope to parse — a clean 4xx from the relay isn't an
// "unreachable" error, and updateRelayState already cleared lastError
// for it. (pingBalance deliberately keeps its own copy: it records an
// error even on a parsed envelope, a different contract — don't fold it
// in here.) Synchronous: returns the parsed envelope or throws — keep it
// that way so call sites can `return` it without awaiting.
function handleRelayResponse({ res, text, label }) {
let parsed = null;
try {
parsed = text ? JSON.parse(text) : null;
@@ -137,7 +136,7 @@ export function createRelayProvider({
parsed?.message ||
text?.slice(0, 300) ||
`HTTP ${res.status}`;
const err = new Error(`Relay GET ${path} ${res.status}: ${msg}`);
const err = new Error(`${label} ${res.status}: ${msg}`);
err.status = res.status;
err.envelope = parsed;
if (!parsed) recordRelayError(msg, creditKey);
@@ -146,6 +145,20 @@ export function createRelayProvider({
return parsed;
}
// GET wrapper mirroring postRelay's envelope-aware error handling.
// Used by the transcribe-url poll loop to fetch job status.
async function getRelay({ path, headers, signal }) {
let res;
try {
res = await fetch(`${base}${path}`, { method: "GET", headers, signal });
} catch (err) {
recordRelayError(err?.message || String(err), creditKey);
throw err;
}
const text = await res.text();
return handleRelayResponse({ res, text, label: `Relay GET ${path}` });
}
async function postRelay({ path, body, headers, signal }) {
let res;
try {
@@ -160,26 +173,7 @@ export function createRelayProvider({
throw err;
}
const text = await res.text();
let parsed = null;
try {
parsed = text ? JSON.parse(text) : null;
} catch {}
if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) {
updateRelayState(parsed, creditKey);
}
if (!res.ok) {
const msg =
parsed?.error ||
parsed?.message ||
text?.slice(0, 300) ||
`HTTP ${res.status}`;
const err = new Error(`Relay ${path} ${res.status}: ${msg}`);
err.status = res.status;
err.envelope = parsed;
if (!parsed) recordRelayError(msg, creditKey);
throw err;
}
return parsed;
return handleRelayResponse({ res, text, label: `Relay ${path}` });
}
return {
@@ -941,21 +935,11 @@ export function createRelayProvider({
}
if (!res.ok) {
// Errors carry the standard JSON envelope — harvest balance + msg.
// Success returns binary, so tts can't fully share postRelay; only
// the error path delegates. handleRelayResponse always throws when
// !res.ok; `return` makes that explicit and mirrors getRelay/postRelay.
const errText = await res.text().catch(() => "");
let parsed = null;
try {
parsed = errText ? JSON.parse(errText) : null;
} catch {}
if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) {
updateRelayState(parsed, creditKey);
}
const msg =
parsed?.error || parsed?.message || errText?.slice(0, 300) || `HTTP ${res.status}`;
const err = new Error(`Relay /relay/tts ${res.status}: ${msg}`);
err.status = res.status;
err.envelope = parsed;
if (!parsed) recordRelayError(msg, creditKey);
throw err;
return handleRelayResponse({ res, text: errText, label: "Relay /relay/tts" });
}
const audio = Buffer.from(await res.arrayBuffer());
// Success path: credit state lives in headers. "unlimited" (Max) →
@@ -988,37 +972,51 @@ export function createRelayProvider({
// The relay is the source of truth for cloud Pro/Max tiers. The operator
// grant flow calls these server-to-server, authenticated by the shared
// operator key — no per-user license involved.
export async function setRelayUserTier({ userId, tier, expiresAt = null, timeoutMs = 10000 }) {
//
// Two contracts, deliberately different — don't collapse them into one:
// • operatorPost — writes/creates (tier grant, invoice, order): THROWS
// on missing config or a non-OK status, so the operator action that
// triggered it surfaces the failure instead of silently no-op'ing.
// • operatorGet — reads (current tier, plans, expiring subs): returns
// null on ANY misconfig/failure, so the caller falls back to a sane
// default rather than breaking.
async function operatorPost({ path, body, label, timeoutMs }) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base) throw new Error("relay base URL not configured");
if (!operatorKey) {
throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)");
}
const res = await fetch(`${base.replace(/\/$/, "")}/relay/user-tier`, {
const res = await fetch(`${base.replace(/\/$/, "")}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey },
body: JSON.stringify({ user_id: userId, tier, expires_at: expiresAt || undefined }),
body: JSON.stringify(body),
signal: AbortSignal.timeout(timeoutMs),
});
const data = await res.json().catch(() => ({}));
if (!res.ok) {
const err = new Error(data?.error || `relay user-tier ${res.status}`);
const err = new Error(data?.error || `${label} ${res.status}`);
err.status = res.status;
throw err;
}
return data;
}
export async function getRelayUserTier({ userId, timeoutMs = 8000 }) {
async function operatorGet({ path, searchParams = null, timeoutMs }) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base || !operatorKey) return null;
try {
const res = await fetch(
`${base.replace(/\/$/, "")}/relay/user-tier/${encodeURIComponent(userId)}`,
{ headers: { "X-Recap-Operator-Key": operatorKey }, signal: AbortSignal.timeout(timeoutMs) }
);
const url = new URL(`${base.replace(/\/$/, "")}${path}`);
if (searchParams) {
for (const [k, v] of Object.entries(searchParams)) {
url.searchParams.set(k, String(v));
}
}
const res = await fetch(url, {
headers: { "X-Recap-Operator-Key": operatorKey },
signal: AbortSignal.timeout(timeoutMs),
});
if (!res.ok) return null;
return await res.json();
} catch {
@@ -1026,6 +1024,22 @@ export async function getRelayUserTier({ userId, timeoutMs = 8000 }) {
}
}
export async function setRelayUserTier({ userId, tier, expiresAt = null, timeoutMs = 10000 }) {
return operatorPost({
path: "/relay/user-tier",
body: { user_id: userId, tier, expires_at: expiresAt || undefined },
label: "relay user-tier",
timeoutMs,
});
}
export async function getRelayUserTier({ userId, timeoutMs = 8000 }) {
return operatorGet({
path: `/relay/user-tier/${encodeURIComponent(userId)}`,
timeoutMs,
});
}
// Ask the relay to create a BTCPay invoice for a prepaid Pro/Max period for
// `userId`. Operator-key authed (server-to-server). Returns
// { invoice_id, checkout_url, sats, tier, period_days } or throws.
@@ -1035,25 +1049,12 @@ export async function createRelayTierInvoice({
returnUrl = null,
timeoutMs = 12000,
}) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base) throw new Error("relay base URL not configured");
if (!operatorKey) {
throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)");
}
const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-invoice`, {
method: "POST",
headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey },
body: JSON.stringify({ user_id: userId, tier, return_url: returnUrl || undefined }),
signal: AbortSignal.timeout(timeoutMs),
return operatorPost({
path: "/relay/tier-invoice",
body: { user_id: userId, tier, return_url: returnUrl || undefined },
label: "relay tier-invoice",
timeoutMs,
});
const data = await res.json().catch(() => ({}));
if (!res.ok) {
const err = new Error(data?.error || `relay tier-invoice ${res.status}`);
err.status = res.status;
throw err;
}
return data;
}
// Ask the relay to create a Zaprite (card) hosted-checkout order for a
@@ -1066,25 +1067,12 @@ export async function createRelayZapriteOrder({
returnUrl = null,
timeoutMs = 12000,
}) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base) throw new Error("relay base URL not configured");
if (!operatorKey) {
throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)");
}
const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-zaprite-order`, {
method: "POST",
headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey },
body: JSON.stringify({ user_id: userId, tier, return_url: returnUrl || undefined }),
signal: AbortSignal.timeout(timeoutMs),
return operatorPost({
path: "/relay/tier-zaprite-order",
body: { user_id: userId, tier, return_url: returnUrl || undefined },
label: "relay tier-zaprite-order",
timeoutMs,
});
const data = await res.json().catch(() => ({}));
if (!res.ok) {
const err = new Error(data?.error || `relay tier-zaprite-order ${res.status}`);
err.status = res.status;
throw err;
}
return data;
}
// Read the buyable subscription plans + sats prices from the relay (the
@@ -1092,19 +1080,7 @@ export async function createRelayZapriteOrder({
// { period_days, plans: [{tier, sats}] } or null when the relay is
// unreachable / unconfigured (caller falls back to a sane default).
export async function getRelayTierPlans({ timeoutMs = 8000 } = {}) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base || !operatorKey) return null;
try {
const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-plans`, {
headers: { "X-Recap-Operator-Key": operatorKey },
signal: AbortSignal.timeout(timeoutMs),
});
if (!res.ok) return null;
return await res.json();
} catch {
return null;
}
return operatorGet({ path: "/relay/tier-plans", timeoutMs });
}
// List cloud users whose prepaid Pro/Max period expires within
@@ -1118,22 +1094,11 @@ export async function getRelayExpiringSubscriptions({
lapsedDays = 3,
timeoutMs = 10000,
} = {}) {
const base = getRelayBaseURL();
const operatorKey = getRelayOperatorKey();
if (!base || !operatorKey) return null;
try {
const url = new URL(`${base.replace(/\/$/, "")}/relay/expiring-subscriptions`);
url.searchParams.set("within_days", String(withinDays));
url.searchParams.set("lapsed_days", String(lapsedDays));
const res = await fetch(url, {
headers: { "X-Recap-Operator-Key": operatorKey },
signal: AbortSignal.timeout(timeoutMs),
});
if (!res.ok) return null;
return await res.json();
} catch {
return null;
}
return operatorGet({
path: "/relay/expiring-subscriptions",
searchParams: { within_days: withinDays, lapsed_days: lapsedDays },
timeoutMs,
});
}
// Streams a file off disk into a Blob with the given MIME type for
+243
View File
@@ -0,0 +1,243 @@
// Unit tests for the relay provider's shared response handling and the
// operator-call helpers introduced by the dedup refactor
// (handleRelayResponse / operatorPost / operatorGet). The relay is
// network-dependent, so we stub global.fetch. These nail:
// - the tts error-path control flow, which must THROW, not fall through
// to the binary arrayBuffer read (regression for the missing-return fix);
// - handleRelayResponse's envelope parsing + the recordRelayError-only-
// when-no-envelope rule;
// - the deliberate throw-vs-null split between operatorPost (writes) and
// operatorGet (reads), including the missing-config branch.
import { test, describe, afterEach } from "node:test";
import assert from "node:assert/strict";
import {
createRelayProvider,
setRelayUserTier,
getRelayUserTier,
createRelayTierInvoice,
getRelayTierPlans,
getRelayExpiringSubscriptions,
} from "../providers/relay.js";
import { getRelayState, resetRelayState } from "../relay-state.js";
import { getRelayOperatorKey } from "../relay-default.js";
const BASE = "https://relay.recaps.cc";
// Pin an operator key so the operator-helper tests exercise the configured
// path. node --test isolates each test file in its own process, so this
// doesn't leak into other suites.
process.env.RECAP_RELAY_OPERATOR_KEY = "test-op-key";
const realFetch = global.fetch;
let fetchCalls = [];
// Install a stub that records calls and returns (or runs) the given response.
function stubFetch(response) {
fetchCalls = [];
global.fetch = async (url, opts) => {
fetchCalls.push({ url: String(url), opts });
return typeof response === "function" ? response(url, opts) : response;
};
}
afterEach(() => {
global.fetch = realFetch;
resetRelayState();
});
// Minimal Response-like stubs covering the surfaces the provider touches.
function jsonRes({ ok = true, status = 200, body = {} }) {
return { ok, status, text: async () => JSON.stringify(body), json: async () => body };
}
function textRes({ ok = false, status = 502, body = "Bad Gateway" }) {
return {
ok,
status,
text: async () => body,
json: async () => {
throw new SyntaxError("not json");
},
};
}
function binRes({ headers = {}, bytes = [1, 2, 3] }) {
return {
ok: true,
status: 200,
headers: new Headers(headers),
arrayBuffer: async () => new Uint8Array(bytes).buffer,
};
}
const provider = () => createRelayProvider({ baseURL: BASE, installId: "test-install" });
// A never-aborting signal so tts skips its 90s AbortSignal.timeout fallback.
const noTimeout = () => new AbortController().signal;
describe("relay tts: error path throws (regression for the missing-return fix)", () => {
test("throws on a non-OK response instead of falling through to the binary read", async () => {
stubFetch(jsonRes({ ok: false, status: 500, body: { error: "kokoro busy" } }));
await assert.rejects(
() => provider().tts({ text: "hi", voice: "bm_george", signal: noTimeout() }),
(err) => {
assert.equal(err.status, 500);
assert.equal(err.message, "Relay /relay/tts 500: kokoro busy");
assert.deepEqual(err.envelope, { error: "kokoro busy" });
return true;
}
);
});
test("harvests the credit envelope on a parsed error, records NO relay error", async () => {
stubFetch(jsonRes({ ok: false, status: 402, body: { error: "out of credits", credits_remaining: 0 } }));
await assert.rejects(() => provider().tts({ text: "x", voice: "v", signal: noTimeout() }));
const state = getRelayState("inst:test-install");
assert.equal(state.creditsRemaining, 0); // updateRelayState ran off the envelope
assert.equal(state.lastError, null); // recordRelayError did NOT (envelope present)
});
test("records a relay error when the error body is not JSON", async () => {
stubFetch(textRes({ ok: false, status: 502, body: "Bad Gateway" }));
await assert.rejects(
() => provider().tts({ text: "x", voice: "v", signal: noTimeout() }),
(err) => {
assert.equal(err.status, 502);
assert.equal(err.message, "Relay /relay/tts 502: Bad Gateway");
return true;
}
);
assert.equal(getRelayState("inst:test-install").lastError, "Bad Gateway");
});
test("success path returns binary audio and never enters the error branch", async () => {
stubFetch(
binRes({
headers: {
"Content-Type": "audio/mpeg",
"X-Recap-Credits-Remaining": "7",
"X-Recap-Tier": "pro",
"X-Recap-Credit-Charged": "1",
"X-Recap-Audio-Duration": "12.5",
},
bytes: [10, 20, 30],
})
);
const out = await provider().tts({ text: "hi", voice: "bm_george", signal: noTimeout() });
assert.ok(Buffer.isBuffer(out.audio));
assert.equal(out.audio.length, 3);
assert.equal(out.contentType, "audio/mpeg");
assert.equal(out.creditCharged, 1);
assert.equal(out.durationSeconds, 12.5);
const state = getRelayState("inst:test-install");
assert.equal(state.creditsRemaining, 7);
assert.equal(state.tier, "pro");
});
});
describe("operatorPost (writes — throw on failure)", () => {
test("setRelayUserTier POSTs the right URL/headers/body and returns data on OK", async () => {
stubFetch(jsonRes({ ok: true, body: { ok: true, tier: "max" } }));
const data = await setRelayUserTier({ userId: "u1", tier: "max", expiresAt: "2026-09-01T00:00:00Z" });
assert.deepEqual(data, { ok: true, tier: "max" });
assert.equal(fetchCalls.length, 1);
const { url, opts } = fetchCalls[0];
assert.equal(url, `${BASE}/relay/user-tier`);
assert.equal(opts.method, "POST");
assert.equal(opts.headers["Content-Type"], "application/json");
assert.equal(opts.headers["X-Recap-Operator-Key"], "test-op-key");
assert.deepEqual(JSON.parse(opts.body), {
user_id: "u1",
tier: "max",
expires_at: "2026-09-01T00:00:00Z",
});
});
test("omits undefined body keys (expires_at) via JSON.stringify", async () => {
stubFetch(jsonRes({ ok: true, body: {} }));
await setRelayUserTier({ userId: "u1", tier: "pro" }); // expiresAt defaults null → undefined
assert.deepEqual(JSON.parse(fetchCalls[0].opts.body), { user_id: "u1", tier: "pro" });
});
test("throws with .status and the relay's error message on a non-OK response", async () => {
stubFetch(jsonRes({ ok: false, status: 409, body: { error: "already granted" } }));
await assert.rejects(
() => setRelayUserTier({ userId: "u1", tier: "max" }),
(err) => {
assert.equal(err.status, 409);
assert.equal(err.message, "already granted");
return true;
}
);
});
test("falls back to '<label> <status>' when the error body has no .error", async () => {
stubFetch(jsonRes({ ok: false, status: 500, body: {} }));
await assert.rejects(
() => createRelayTierInvoice({ userId: "u1", tier: "pro" }),
(err) => {
assert.equal(err.message, "relay tier-invoice 500");
return true;
}
);
});
});
describe("operatorGet (reads — null on failure)", () => {
test("getRelayTierPlans returns parsed JSON on OK", async () => {
stubFetch(jsonRes({ ok: true, body: { period_days: 30, plans: [{ tier: "pro", sats: 1000 }] } }));
assert.deepEqual(await getRelayTierPlans(), {
period_days: 30,
plans: [{ tier: "pro", sats: 1000 }],
});
assert.equal(fetchCalls[0].url, `${BASE}/relay/tier-plans`);
assert.equal(fetchCalls[0].opts.headers["X-Recap-Operator-Key"], "test-op-key");
});
test("getRelayTierPlans returns null on a non-OK response", async () => {
stubFetch(jsonRes({ ok: false, status: 503, body: { error: "down" } }));
assert.equal(await getRelayTierPlans(), null);
});
test("getRelayTierPlans returns null when fetch rejects (network error)", async () => {
stubFetch(() => {
throw new TypeError("fetch failed");
});
assert.equal(await getRelayTierPlans(), null);
});
test("getRelayUserTier percent-encodes the user id into the path", async () => {
stubFetch(jsonRes({ ok: true, body: { tier: "max" } }));
assert.deepEqual(await getRelayUserTier({ userId: "user/with space" }), { tier: "max" });
assert.equal(fetchCalls[0].url, `${BASE}/relay/user-tier/user%2Fwith%20space`);
});
test("getRelayExpiringSubscriptions builds the within_days/lapsed_days query", async () => {
stubFetch(jsonRes({ ok: true, body: { subscriptions: [] } }));
await getRelayExpiringSubscriptions({ withinDays: 5, lapsedDays: 2 });
const u = new URL(fetchCalls[0].url);
assert.equal(u.pathname, "/relay/expiring-subscriptions");
assert.equal(u.searchParams.get("within_days"), "5");
assert.equal(u.searchParams.get("lapsed_days"), "2");
});
});
describe("operator helpers: missing-config contract (write throws, read returns null)", () => {
test("same misconfig → setRelayUserTier throws, getRelayTierPlans returns null, no fetch", async (t) => {
const saved = process.env.RECAP_RELAY_OPERATOR_KEY;
delete process.env.RECAP_RELAY_OPERATOR_KEY;
try {
if (getRelayOperatorKey()) {
t.skip("operator key resolved from config.js in this env — can't exercise the missing-key path");
return;
}
stubFetch(() => {
throw new Error("fetch must not be called when the operator key is absent");
});
await assert.rejects(() => setRelayUserTier({ userId: "u1", tier: "pro" }), /operator key not configured/);
assert.equal(await getRelayTierPlans(), null);
assert.equal(fetchCalls.length, 0);
} finally {
process.env.RECAP_RELAY_OPERATOR_KEY = saved;
}
});
});