// Relay provider — proxies transcription + analysis calls through the // operator's relay backend (operator-side service, not in this repo). // The relay handles billing/credit accounting, picks the actual backing // model (Gemini, the operator's Parakeet+Gemma, etc.), and returns a // uniform response with the user's remaining credit balance. // // Auth shape: // - `X-Recap-Install-Id` header on every call (required; identifies // the credit owner). Comes from ./install-id.js. // - `Authorization: Bearer ` header when a license is // present. Absent = treat as Core (free) tier. // - `X-Recap-Job-Id` header on every call, common across the // transcribe + analyze pair that make up one full summary. The // relay decrements credits on the FIRST call with a given job_id; // subsequent calls with the same id are free (until job_id // expires, ~1h). Means "1 full summary = 1 credit" regardless of // whether one or both pipeline steps go through the relay. // // Response envelope (success and error both): // { // "result": { ...endpoint-specific payload... }, // "credits_remaining": , // "tier": "core" | "pro" | "max", // "credit_charged": // } // // What this provider does NOT do: // - Validate the user's license. Relay does that server-side. // - Track historical credit usage. Relay's DB owns the ledger. // - Choose which backing model the relay uses. Operator's call. import { createReadStream } from "fs"; import { retryAPI, formatTime } from "../util.js"; import { zeroCost } from "./cost.js"; import { updateRelayState, recordRelayError, computeCreditKey, } from "../relay-state.js"; import { getRelayBaseURL, getRelayOperatorKey } from "../relay-default.js"; // Provider name shown in logs + chunk pagination labels. "relay" rather // than e.g. "keysat-relay" because operators may run their own relay // using a different backend brand — the name should describe the // architecture, not the operator. const NAME = "relay"; // Models exposed to the picker. The relay decides what actually runs — // these labels are placeholders so the picker can show something. const RELAY_TRANSCRIPTION_MODELS = ["relay-default"]; const RELAY_ANALYSIS_MODELS = ["relay-default"]; export function createRelayProvider({ baseURL, installId, licenseKey, // Core-decoupling cloud identity: when `cloud` is set, the relay call // authenticates the SERVER with `operatorKey` and names the user via // `userId` (X-Recap-User-Id) instead of carrying a per-user license. cloud = false, userId = null, operatorKey = null, timeoutMs = 900_000, } = {}) { if (!baseURL) { throw new Error( "createRelayProvider: baseURL is required (e.g. https://relay.keysat.xyz)" ); } if (cloud) { if (!userId || !operatorKey) { throw new Error( "createRelayProvider: cloud identity requires userId + operatorKey" ); } } else if (!installId) { throw new Error( "createRelayProvider: installId is required (boot must initInstallId first)" ); } const base = baseURL.replace(/\/$/, ""); // Per-identity credit-key for the relay-state cache. Computed once // here because (installId, licenseKey) are fixed for this provider // instance — every subsequent updateRelayState/recordRelayError on // this instance routes to the same cache slot. Multi-mode creates a // fresh provider per request (resolveProviderOpts injects per-user // identity), so each user's relay state stays isolated. const creditKey = computeCreditKey({ installId, licenseKey, userId: cloud ? userId : null, }); // Build the auth/identity headers attached to every relay call. // job_id is optional but the orchestration layer should always pass // one — without it the relay can't bundle the transcribe + analyze // pair into a single credit charge. function buildHeaders({ extra = {}, jobId } = {}) { const h = { ...extra }; if (cloud) { h["X-Recap-User-Id"] = userId; h["X-Recap-Operator-Key"] = operatorKey; } else { h["X-Recap-Install-Id"] = installId; if (licenseKey) h["Authorization"] = `Bearer ${licenseKey}`; } if (jobId) h["X-Recap-Job-Id"] = jobId; return h; } // Common error-handling wrapper. The relay's contract is that ANY // response (success or failure) carries the standard envelope so // Recap can keep its balance display accurate even on errors. We // try to parse error bodies to harvest that. // GET wrapper mirroring postRelay's envelope-aware error handling. // Used by the transcribe-url poll loop to fetch job status. async function getRelay({ path, headers, signal }) { let res; try { res = await fetch(`${base}${path}`, { method: "GET", headers, signal }); } catch (err) { recordRelayError(err?.message || String(err), creditKey); throw err; } const text = await res.text(); let parsed = null; try { parsed = text ? JSON.parse(text) : null; } catch {} if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) { updateRelayState(parsed, creditKey); } if (!res.ok) { const msg = parsed?.error || parsed?.message || text?.slice(0, 300) || `HTTP ${res.status}`; const err = new Error(`Relay GET ${path} ${res.status}: ${msg}`); err.status = res.status; err.envelope = parsed; if (!parsed) recordRelayError(msg, creditKey); throw err; } return parsed; } async function postRelay({ path, body, headers, signal }) { let res; try { res = await fetch(`${base}${path}`, { method: "POST", headers, body, signal, }); } catch (err) { recordRelayError(err?.message || String(err), creditKey); throw err; } const text = await res.text(); let parsed = null; try { parsed = text ? JSON.parse(text) : null; } catch {} if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) { updateRelayState(parsed, creditKey); } if (!res.ok) { const msg = parsed?.error || parsed?.message || text?.slice(0, 300) || `HTTP ${res.status}`; const err = new Error(`Relay ${path} ${res.status}: ${msg}`); err.status = res.status; err.envelope = parsed; if (!parsed) recordRelayError(msg, creditKey); throw err; } return parsed; } return { name: NAME, capabilities: { transcribe: true, analyze: true, // The relay's model catalog is internal — Recap doesn't pick. // listModels: false signals the picker to skip the dropdown. listModels: false, }, listAnalysisModels() { return [...RELAY_ANALYSIS_MODELS]; }, listTranscriptionModels() { return [...RELAY_TRANSCRIPTION_MODELS]; }, // POST /relay/transcribe-url — like transcribeAudio but the // relay fetches the audio from the URL itself (yt-dlp for YouTube, // direct HTTP for podcast RSS audio). Saves the buyer's upload // bandwidth, which is often the slowest leg of the pipeline. // // The relay processes this asynchronously: the POST returns // immediately with a job_id, then we poll GET /relay/jobs/{id} // until status flips to "complete" or "failed". Async pattern is // required because no proxy / load balancer in the path can be // trusted to keep a multi-minute HTTP request alive — short poll // requests, by contrast, are bulletproof. async transcribeUrl({ mediaUrl, mediaType, // "youtube" | "podcast" (optional; relay sniffs URL shape) mimeType, titleHint, channelHint = "", descriptionHint = "", chaptersHint = [], onProgress = () => {}, signal, jobId, }) { onProgress(`Asking relay to fetch + transcribe ${mediaUrl.slice(0, 80)}...`); const start = Date.now(); // Step 1: kick off the job. retryAPI handles transient transport // errors on this short request. const initEnvelope = await retryAPI( () => postRelay({ path: "/relay/transcribe-url", body: JSON.stringify({ media_url: mediaUrl, type: mediaType || undefined, mime_type: mimeType || undefined, title: titleHint || undefined, channel: channelHint || undefined, description: descriptionHint || undefined, chapters: Array.isArray(chaptersHint) && chaptersHint.length > 0 ? chaptersHint : undefined, }), headers: buildHeaders({ extra: { "Content-Type": "application/json" }, jobId, }), signal, }), { retries: 2, delayMs: 5000, label: "Relay transcribe-url (kickoff)", log: (msg) => onProgress(msg), } ); const kickoffResult = initEnvelope.result || {}; const backgroundJobId = kickoffResult.job_id; if (!backgroundJobId) { throw new Error( "Relay transcribe-url didn't return a job_id — old relay version? Re-install relay 0.2.14 or newer." ); } onProgress( `Relay accepted job ${backgroundJobId.slice(0, 8)}… processing in background` ); // Step 2: poll GET /relay/jobs/{id} until complete or failed. // Generous max-wait — relay transcribes for long audio can run // several minutes; we want to wait through that, not give up // prematurely. The poll requests themselves are cheap, so the // cost of a long wait is just time-on-the-clock, not bandwidth. const POLL_INTERVAL_MS = 5_000; const MAX_WAIT_MS = 30 * 60 * 1000; // 30 min const deadline = Date.now() + MAX_WAIT_MS; let lastProgress = null; let pollFailuresInARow = 0; const MAX_CONSECUTIVE_POLL_FAILURES = 6; // ~30s of poll outage let envelope = null; while (true) { if (signal?.aborted) { throw new Error("Relay job polling aborted"); } if (Date.now() > deadline) { throw new Error( `Relay transcribe-url did not complete within ${Math.round( MAX_WAIT_MS / 60_000 )} minutes — giving up` ); } await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS)); try { envelope = await getRelay({ path: `/relay/jobs/${encodeURIComponent(backgroundJobId)}`, headers: buildHeaders({ jobId }), signal, }); pollFailuresInARow = 0; } catch (err) { // A 404 specifically means the relay no longer knows about // this job. That's not a network blip — the relay almost // certainly restarted (operator update, crash, manual // restart) and lost in-memory job state. Surface // immediately rather than burning 6 retries; the orchestrator's // fallback logic decides whether to retry from scratch. const status = err?.status || 0; const msg = err?.message || String(err); if (status === 404 || /job_not_found/.test(msg)) { throw new Error( `Relay lost the job (probably restarted) — start over to retry` ); } // Other failures (network, TLS, timeout) are transient. // Retry up to MAX_CONSECUTIVE_POLL_FAILURES before giving up. pollFailuresInARow += 1; if (pollFailuresInARow >= MAX_CONSECUTIVE_POLL_FAILURES) { throw new Error( `Relay polling lost — ${pollFailuresInARow} consecutive failures: ${msg}` ); } onProgress( `Relay poll glitch (${pollFailuresInARow}/${MAX_CONSECUTIVE_POLL_FAILURES}): ${msg.slice(0, 100)}` ); continue; } const jobRes = envelope.result || {}; if (jobRes.progress && jobRes.progress !== lastProgress) { lastProgress = jobRes.progress; onProgress(`Relay: ${jobRes.progress}`); } if (jobRes.status === "complete") break; if (jobRes.status === "failed") { throw new Error( jobRes.error || "Relay transcribe-url job failed (no detail)" ); } // "queued" / "running" → keep polling } const elapsed = ((Date.now() - start) / 1000).toFixed(1); const remaining = typeof envelope.credits_remaining === "number" ? `, ${envelope.credits_remaining} credits left` : ""; onProgress(`Relay transcribe-url complete in ${elapsed}s${remaining}`); // The job's result field carries the transcribe backend's // output verbatim — same shape as the (sync) transcribeAudio // result. Walk segments → bracketed text the same way. const innerResult = envelope.result?.result || {}; const segments = Array.isArray(innerResult.segments) ? innerResult.segments : []; const lines = segments.length ? segments.map( (s) => `[${formatTime(s.start || 0)}] ${(s.text || "").trim()}` ) : [`[0:00] ${(innerResult.text || "").trim()}`]; const text = lines.join("\n"); const cost = zeroCost({ inputTokens: 0, outputTokens: 0, thinkingTokens: 0, }); return { text, usage: { inputTokens: 0, outputTokens: 0, thinkingTokens: 0, totalTokens: 0 }, cost, finishReason: null, blockReason: "none", raw: envelope, }; }, // POST /relay/summarize-url — combined transcribe+analyze pipeline // that streams per-window section results back over SSE. Used in // "Recap Relay" mode where the user has chosen the operator's // relay for the WHOLE pipeline (not per-step). Replaces the // old transcribeUrl + per-window analyzeText fan-out with a single // server-side pipeline, saving ~12 round-trips per long video and // letting the operator's Settings-tab chunking knobs actually // drive production behavior (instead of just benchmarks). // // Flow: // 1. POST /relay/summarize-url → returns job_id immediately // 2. GET /relay/summarize-url/:jobId/events (SSE) → stream // transcribe_complete + window_complete + done events // 3. onProgress / onWindowComplete callbacks fire as events // arrive (mirrors the recap-app's chunked-analyze.js shape // so the existing UI rendering code keeps working) // 4. Returns the final { transcript, sections } envelope when // "done" arrives. Throws on "error" or stream close before // done. // // Falls back to one-shot poll-based completion if SSE never // connects (e.g. operator's reverse proxy strips text/event-stream // — observed with overly-aggressive content-type filters). async summarizeUrl({ mediaUrl, mediaType, mimeType, titleHint, channelHint = "", descriptionHint = "", chaptersHint = [], onProgress = () => {}, onWindowComplete = null, // Fires when the relay's SSE stream emits transcribe_complete. // The full transcript text is available BEFORE any analyze // window completes (analyze runs after transcribe finishes), // so subscribing here lets the caller parse the transcript // into entries and have them ready in time for the FIRST // window_complete callback. Used by Recap-app's relay-mode // branch to stream per-window section chunks to the browser // incrementally — without parsed entries the chunks can\'t be // assembled, so this callback is the dependency that unblocks // streaming. onTranscribeComplete = null, signal, jobId, }) { onProgress(`Asking relay to summarize ${mediaUrl.slice(0, 80)}...`); const start = Date.now(); // Step 1: kick off the job. const initEnvelope = await retryAPI( () => postRelay({ path: "/relay/summarize-url", body: JSON.stringify({ media_url: mediaUrl, type: mediaType || undefined, mime_type: mimeType || undefined, title: titleHint || undefined, channel: channelHint || undefined, description: descriptionHint || undefined, chapters: Array.isArray(chaptersHint) && chaptersHint.length > 0 ? chaptersHint : undefined, }), headers: buildHeaders({ extra: { "Content-Type": "application/json" }, jobId, }), signal, }), { retries: 2, delayMs: 5000, label: "Relay summarize-url (kickoff)", log: (msg) => onProgress(msg), } ); const kickoffResult = initEnvelope.result || {}; const backgroundJobId = kickoffResult.job_id; if (!backgroundJobId) { throw new Error( "Relay summarize-url didn't return a job_id — old relay version? Re-install relay 0.2.33 or newer." ); } onProgress( `Relay accepted job ${backgroundJobId.slice(0, 8)}… streaming` ); // Step 2: open SSE stream for live events. // We use fetch + manual SSE parsing rather than EventSource // because (a) Node's global EventSource is recent (24+), (b) // we need custom auth headers which EventSource doesn't support. let sseRes; try { sseRes = await fetch( `${base}/relay/summarize-url/${encodeURIComponent(backgroundJobId)}/events`, { method: "GET", headers: buildHeaders({ extra: { Accept: "text/event-stream" }, jobId, }), signal, } ); } catch (err) { recordRelayError(err?.message || String(err), creditKey); throw new Error( `Relay summarize-url SSE connect failed: ${err?.message || err}` ); } if (!sseRes.ok || !sseRes.body) { throw new Error( `Relay summarize-url SSE returned ${sseRes.status} ${sseRes.statusText || ""}`.trim() ); } // Verify the server actually returned an event stream. Some // reverse proxies silently rewrite the content-type which // breaks SSE without raising an HTTP error. const ct = sseRes.headers.get("content-type") || ""; if (!ct.includes("text/event-stream")) { throw new Error( `Relay summarize-url SSE expected text/event-stream, got "${ct}" — check your reverse proxy config` ); } // Step 3: parse SSE frames as they arrive, dispatch to callbacks. // SSE frame syntax: blocks separated by \n\n, each block is a // sequence of "field: value" lines. We collect event/data/id // pairs and fire on each completed frame. const reader = sseRes.body.getReader(); const decoder = new TextDecoder("utf-8"); let buffer = ""; let finalResult = null; let finalError = null; let transcriptText = null; const sectionsByWindow = new Map(); // windowIdx → owned sections let totalWindows = 0; let windowsDone = 0; const handleFrame = (frame) => { if (!frame || !frame.trim()) return; let eventType = "message"; let dataStr = ""; for (const rawLine of frame.split(/\r?\n/)) { if (!rawLine || rawLine.startsWith(":")) continue; const colon = rawLine.indexOf(":"); if (colon < 0) continue; const field = rawLine.slice(0, colon).trim(); const value = rawLine.slice(colon + 1).replace(/^ /, ""); if (field === "event") eventType = value; else if (field === "data") dataStr += (dataStr ? "\n" : "") + value; } if (!dataStr) return; let data; try { data = JSON.parse(dataStr); } catch { return; } if (eventType === "progress") { if (data.message) onProgress(`Relay: ${data.message}`); } else if (eventType === "transcribe_complete") { transcriptText = data.transcript || ""; onProgress( `Relay transcribe done — ${data.chunk_count ?? "?"} chunks, ${Math.round((data.audio_seconds || 0) / 60)} min audio` ); if (onTranscribeComplete) { try { onTranscribeComplete({ transcript: transcriptText, chunk_count: data.chunk_count ?? null, audio_seconds: data.audio_seconds ?? null, model: data.model || null, }); } catch (cbErr) { onProgress(`transcribe_complete callback error: ${cbErr?.message || cbErr}`); } } } else if (eventType === "window_complete") { totalWindows = data.totalWindows || totalWindows; sectionsByWindow.set(data.windowIdx, data.ownedSections || []); windowsDone += 1; if (onWindowComplete) { try { onWindowComplete({ windowIdx: data.windowIdx, totalWindows: data.totalWindows, ownedSections: data.ownedSections || [], // Pipelined mode (relay v0.2.89+) attaches the // window's own entries here. Sequential mode (older // relays OR Gemini-transcribe path) omits the field, // which signals to the caller to fall back to the // global streamedRelayEntries cache populated by // onTranscribeComplete. windowEntries: Array.isArray(data.windowEntries) ? data.windowEntries : null, }); } catch (cbErr) { // Surface to caller log but don't kill the stream. onProgress(`window_complete callback error: ${cbErr?.message || cbErr}`); } } onProgress(`Relay analyze: ${windowsDone}/${data.totalWindows} windows complete`); } else if (eventType === "done") { // Relay versions <= 0.2.59 emitted the SSE done event with a // double-nested shape — markComplete put the whole envelope // (`{result: {inner}, credit_charged, tier}`) into the event, // so `data.result` was the envelope and the actual fields // (title, transcript, analyze_model) lived at // `data.result.result.*`. Relay 0.2.60+ unwraps before // emitting, so `data.result.title` is correct directly. // Detect the old shape by checking if `data.result.result` // exists and looks like the inner object (has the keys we // expect to find at the top of `data.result`). Unwrap once // when present. Backwards-compatible — works against any // relay version. const raw = data.result || {}; if ( raw && typeof raw === "object" && raw.result && typeof raw.result === "object" && ("transcript" in raw.result || "analysis" in raw.result || "title" in raw.result) ) { finalResult = raw.result; } else { finalResult = raw; } } else if (eventType === "error") { finalError = new Error(data.error || "relay summarize-url failed"); } }; try { while (true) { if (signal?.aborted) { try { reader.cancel(); } catch {} throw new Error("Relay summarize-url aborted"); } const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Frames are separated by blank lines. let idx; while ((idx = buffer.indexOf("\n\n")) >= 0) { const frame = buffer.slice(0, idx); buffer = buffer.slice(idx + 2); handleFrame(frame); } if (finalResult || finalError) { try { reader.cancel(); } catch {} break; } } } finally { try { reader.releaseLock(); } catch {} } if (finalError) throw finalError; if (!finalResult) { throw new Error( "Relay summarize-url SSE closed before 'done' event — connection dropped mid-flight" ); } // The "done" event carries the final stitched analysis result. // Stitch our own ordered sections list from sectionsByWindow as // a defensive fallback — but trust finalResult.analysis.sections // when present (it's the relay's authoritative stitch). const stitchedAnalysis = (finalResult.analysis && Array.isArray(finalResult.analysis.sections)) ? finalResult.analysis : { sections: [...sectionsByWindow.keys()] .sort((a, b) => a - b) .flatMap((k) => sectionsByWindow.get(k) || []), }; // Also refresh the relay state with the final envelope's // credit balance so the picker's "credits remaining" pill // updates without a separate /api/relay/status round-trip. // (initEnvelope already had a current snapshot; final state // applies after the job completed.) if (typeof initEnvelope.credits_remaining === "number") { updateRelayState(initEnvelope, creditKey); } const elapsed = ((Date.now() - start) / 1000).toFixed(1); onProgress(`Relay summarize-url complete in ${elapsed}s`); return { // Transcript in bracketed [MM:SS] form (same shape as // transcribeUrl returns), for downstream code paths that // still parse text into entries. transcript: transcriptText || finalResult.transcript || "", // Final stitched analysis as JSON { sections: [...] } with // GLOBAL startIndex / endIndex into the transcript entries. analysis: stitchedAnalysis, // Resolved media title — either the operator-supplied hint or // the title yt-dlp extracted from YouTube during download. // Older relays (< 0.2.53) don't include this field; the // caller falls back to its own titleSurrogate when it's null. title: finalResult.title || null, // Model attribution + timing diagnostics. transcribe_model: finalResult.transcribe_model || null, analyze_model: finalResult.analyze_model || null, audio_seconds: finalResult.audio_seconds || null, audio_bytes: finalResult.audio_bytes || null, wall_time_ms: finalResult.wall_time_ms || null, chunk_count: finalResult.chunk_count || null, analyze_windows: finalResult.analyze_windows || totalWindows, analyze_windows_failed: finalResult.analyze_windows_failed || 0, // Phase 1D — speaker diarization output from operator // hardware. Both null on relays < 0.2.88 OR when diarization // was off OR when no fingerprints could be collected. When // present: // speakers — map { Speaker_A: { turns, total_speaking_seconds, // mean_confidence, chunks_appeared_in, // fingerprint_count }, ... } // transcript_segments — array of { start, end, text, speaker, // speaker_confidence } at the RAW Parakeet // segment granularity (finer than the // readable `transcript` text). Recap's // UI matches these by time against the // merged entries to color each line. speakers: finalResult.speakers || null, transcript_segments: Array.isArray(finalResult.transcript_segments) ? finalResult.transcript_segments : null, // Phase 2 — speaker name map from the relay's post-cluster // polish pass. Null when polish was skipped or all names // returned null. Recap renders these inline with the // speakers legend (showing "Matt Hill · 24:42" instead of // "Speaker A · 24:42"). speaker_names: finalResult.speaker_names || null, }; }, async transcribeAudio({ filePath, mimeType, titleHint, channelHint = "", descriptionHint = "", chaptersHint = [], offsetSeconds = 0, onProgress = () => {}, signal, jobId, }) { onProgress( `Uploading audio${offsetSeconds > 0 ? ` (offset ${formatTime(offsetSeconds)})` : ""} to relay (${base})...` ); const start = Date.now(); // Use multipart form encoding so the audio binary doesn't have // to be base64-blown-up. Node 20+ provides global FormData; pair // it with a stream so we don't load the whole audio file into // memory. const form = new FormData(); const blob = await fileToBlob(filePath, mimeType); form.append("audio", blob, "audio.bin"); form.append("mime_type", mimeType || "application/octet-stream"); if (titleHint) form.append("title", titleHint); if (channelHint) form.append("channel", channelHint); if (descriptionHint) form.append("description", descriptionHint); if (Array.isArray(chaptersHint) && chaptersHint.length > 0) { form.append("chapters", JSON.stringify(chaptersHint)); } form.append("offset_seconds", String(offsetSeconds)); const envelope = await retryAPI( () => postRelay({ path: "/relay/transcribe", body: form, headers: buildHeaders({ jobId }), signal, }), { retries: 2, delayMs: 5000, label: `Relay transcribe${offsetSeconds > 0 ? ` (chunk@${formatTime(offsetSeconds)})` : ""}`, log: (msg) => onProgress(msg), } ); const elapsed = ((Date.now() - start) / 1000).toFixed(1); const remaining = typeof envelope.credits_remaining === "number" ? `, ${envelope.credits_remaining} credits left` : ""; onProgress(`Relay transcribe complete in ${elapsed}s${remaining}`); // Relay's transcribe result shape: { text, segments?: [{start, // end, text}], duration? }. We don't fabricate segment timestamps // here — the orchestration layer's synthesizeEntriesFromText // handles single-segment-only responses. const result = envelope.result || {}; const segments = Array.isArray(result.segments) ? result.segments : []; const lines = segments.length ? segments.map( (s) => `[${formatTime(s.start || 0)}] ${(s.text || "").trim()}` ) : [`[0:00] ${(result.text || "").trim()}`]; const text = lines.join("\n"); // Cost from Recap's POV is always zero — credits are the unit, // and they're tracked separately. The orchestration layer's // cost-summing code keeps working unchanged. const cost = zeroCost({ inputTokens: 0, outputTokens: 0, thinkingTokens: 0, }); return { text, usage: { inputTokens: 0, outputTokens: 0, thinkingTokens: 0, totalTokens: 0 }, cost, finishReason: null, blockReason: "none", raw: envelope, }; }, // Peek at the install's current credit balance without charging. // Used by /api/relay/status to populate the picker banner on boot // (or on demand) so the user sees real numbers before running any // summarize. Short timeout — if the relay is unreachable we want // the UI to fall back to "balance unknown" quickly, not hang. async pingBalance({ timeoutMs = 5000, signal } = {}) { const headers = buildHeaders({}); const ac = new AbortController(); const timer = setTimeout(() => ac.abort(), timeoutMs); const combined = signal ? AbortSignal.any([signal, ac.signal]) : ac.signal; try { const res = await fetch(`${base}/relay/balance`, { method: "GET", headers, signal: combined, }); const text = await res.text(); let parsed = null; try { parsed = text ? JSON.parse(text) : null; } catch {} if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) { updateRelayState(parsed, creditKey); } if (!res.ok) { const msg = parsed?.error || parsed?.message || text?.slice(0, 300) || `HTTP ${res.status}`; recordRelayError(msg, creditKey); const err = new Error(`Relay /balance ${res.status}: ${msg}`); err.status = res.status; throw err; } return parsed; } catch (err) { if (err?.name === "AbortError") { recordRelayError(`balance ping timed out after ${timeoutMs}ms`, creditKey); } else if (!err.status) { recordRelayError(err?.message || String(err), creditKey); } throw err; } finally { clearTimeout(timer); } }, async analyzeText({ prompt, onProgress = () => {}, retries = 2, signal, jobId, }) { const start = Date.now(); const headers = buildHeaders({ extra: { "Content-Type": "application/json" }, jobId, }); const envelope = await retryAPI( () => postRelay({ path: "/relay/analyze", body: JSON.stringify({ prompt }), headers, signal, }), { retries, delayMs: 5000, label: "Relay analyze", log: (msg) => onProgress(msg), } ); const elapsed = ((Date.now() - start) / 1000).toFixed(1); const remaining = typeof envelope.credits_remaining === "number" ? `, ${envelope.credits_remaining} credits left` : ""; onProgress(`Relay analyze complete in ${elapsed}s${remaining}`); const result = envelope.result || {}; const text = typeof result.text === "string" ? result.text : ""; const cost = zeroCost({ inputTokens: 0, outputTokens: 0, thinkingTokens: 0, }); return { text, usage: { inputTokens: 0, outputTokens: 0, thinkingTokens: 0, totalTokens: 0 }, cost, finishReason: null, raw: envelope, }; }, // Text-to-speech for the audio-first ("walking mode") player. Unlike // the other methods this returns BINARY audio (mp3 by default), with // credit/balance metadata in response HEADERS rather than a JSON // envelope — so it can't reuse postRelay. Mirrors postRelay's // error-envelope harvesting + relay-state update on the JSON error // path. The caller passes ONE jobId for a whole recap so the relay // charges at most 1 credit for synthesizing all its topics. async tts({ text, voice, format = "mp3", jobId, signal } = {}) { const headers = buildHeaders({ extra: { "Content-Type": "application/json" }, jobId, }); let res; try { // Per-clip timeout so one hung synth (e.g. Spark Control busy // transcribing the subscription queue) can't stall the whole // sequential prepare loop — the caller catches and moves on to the // next topic. 90s comfortably exceeds the relay's own ~60s Kokoro // timeout, so the relay's clean error wins when it's the slow one. res = await fetch(`${base}/relay/tts`, { method: "POST", headers, body: JSON.stringify({ text, voice, format }), signal: signal || AbortSignal.timeout(90_000), }); } catch (err) { recordRelayError(err?.message || String(err), creditKey); throw err; } if (!res.ok) { // Errors carry the standard JSON envelope — harvest balance + msg. const errText = await res.text().catch(() => ""); let parsed = null; try { parsed = errText ? JSON.parse(errText) : null; } catch {} if (parsed && (typeof parsed.credits_remaining === "number" || parsed.tier)) { updateRelayState(parsed, creditKey); } const msg = parsed?.error || parsed?.message || errText?.slice(0, 300) || `HTTP ${res.status}`; const err = new Error(`Relay /relay/tts ${res.status}: ${msg}`); err.status = res.status; err.envelope = parsed; if (!parsed) recordRelayError(msg, creditKey); throw err; } const audio = Buffer.from(await res.arrayBuffer()); // Success path: credit state lives in headers. "unlimited" (Max) → // null, matching the JSON envelope's null credits_remaining. const creditsHdr = res.headers.get("X-Recap-Credits-Remaining"); const tier = res.headers.get("X-Recap-Tier"); const creditCharged = Number(res.headers.get("X-Recap-Credit-Charged") || 0); const creditsRemaining = creditsHdr == null ? null : creditsHdr === "unlimited" ? null : Number(creditsHdr); if (tier || typeof creditsRemaining === "number") { updateRelayState( { credits_remaining: creditsRemaining, tier, credit_charged: creditCharged }, creditKey, ); } const durHdr = res.headers.get("X-Recap-Audio-Duration"); return { audio, contentType: res.headers.get("Content-Type") || "audio/mpeg", voice: res.headers.get("X-Recap-Tts-Voice") || voice || null, backend: res.headers.get("X-Recap-Tts-Backend") || null, creditCharged, durationSeconds: durHdr ? Number(durHdr) : null, }; }, }; } // ── Operator → relay: set / read a cloud user's tier (core-decoupling) ── // The relay is the source of truth for cloud Pro/Max tiers. The operator // grant flow calls these server-to-server, authenticated by the shared // operator key — no per-user license involved. export async function setRelayUserTier({ userId, tier, expiresAt = null, timeoutMs = 10000 }) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base) throw new Error("relay base URL not configured"); if (!operatorKey) { throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)"); } const res = await fetch(`${base.replace(/\/$/, "")}/relay/user-tier`, { method: "POST", headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey }, body: JSON.stringify({ user_id: userId, tier, expires_at: expiresAt || undefined }), signal: AbortSignal.timeout(timeoutMs), }); const data = await res.json().catch(() => ({})); if (!res.ok) { const err = new Error(data?.error || `relay user-tier ${res.status}`); err.status = res.status; throw err; } return data; } export async function getRelayUserTier({ userId, timeoutMs = 8000 }) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base || !operatorKey) return null; try { const res = await fetch( `${base.replace(/\/$/, "")}/relay/user-tier/${encodeURIComponent(userId)}`, { headers: { "X-Recap-Operator-Key": operatorKey }, signal: AbortSignal.timeout(timeoutMs) } ); if (!res.ok) return null; return await res.json(); } catch { return null; } } // Ask the relay to create a BTCPay invoice for a prepaid Pro/Max period for // `userId`. Operator-key authed (server-to-server). Returns // { invoice_id, checkout_url, sats, tier, period_days } or throws. export async function createRelayTierInvoice({ userId, tier, returnUrl = null, timeoutMs = 12000, }) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base) throw new Error("relay base URL not configured"); if (!operatorKey) { throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)"); } const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-invoice`, { method: "POST", headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey }, body: JSON.stringify({ user_id: userId, tier, return_url: returnUrl || undefined }), signal: AbortSignal.timeout(timeoutMs), }); const data = await res.json().catch(() => ({})); if (!res.ok) { const err = new Error(data?.error || `relay tier-invoice ${res.status}`); err.status = res.status; throw err; } return data; } // Ask the relay to create a Zaprite (card) hosted-checkout order for a // prepaid Pro/Max period for `userId`. Operator-key authed (server-to- // server), mirroring createRelayTierInvoice but for the card rail. Returns // { order_id, checkout_url, amount, currency, tier, period_days } or throws. export async function createRelayZapriteOrder({ userId, tier, returnUrl = null, timeoutMs = 12000, }) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base) throw new Error("relay base URL not configured"); if (!operatorKey) { throw new Error("operator key not configured (set RECAP_RELAY_OPERATOR_KEY)"); } const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-zaprite-order`, { method: "POST", headers: { "Content-Type": "application/json", "X-Recap-Operator-Key": operatorKey }, body: JSON.stringify({ user_id: userId, tier, return_url: returnUrl || undefined }), signal: AbortSignal.timeout(timeoutMs), }); const data = await res.json().catch(() => ({})); if (!res.ok) { const err = new Error(data?.error || `relay tier-zaprite-order ${res.status}`); err.status = res.status; throw err; } return data; } // Read the buyable subscription plans + sats prices from the relay (the // pricing source of truth). Operator-key authed. Returns // { period_days, plans: [{tier, sats}] } or null when the relay is // unreachable / unconfigured (caller falls back to a sane default). export async function getRelayTierPlans({ timeoutMs = 8000 } = {}) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base || !operatorKey) return null; try { const res = await fetch(`${base.replace(/\/$/, "")}/relay/tier-plans`, { headers: { "X-Recap-Operator-Key": operatorKey }, signal: AbortSignal.timeout(timeoutMs), }); if (!res.ok) return null; return await res.json(); } catch { return null; } } // List cloud users whose prepaid Pro/Max period expires within // `withinDays` (future) or lapsed within the last `lapsedDays`. Operator- // key authed. The relay owns subscription expiry; Recaps calls this to // decide who to email expiry reminders to. Returns the parsed // { subscriptions: [{user_id, tier, expires_at, expired, days_left}] } // or null when the relay is unreachable / unconfigured. export async function getRelayExpiringSubscriptions({ withinDays = 7, lapsedDays = 3, timeoutMs = 10000, } = {}) { const base = getRelayBaseURL(); const operatorKey = getRelayOperatorKey(); if (!base || !operatorKey) return null; try { const url = new URL(`${base.replace(/\/$/, "")}/relay/expiring-subscriptions`); url.searchParams.set("within_days", String(withinDays)); url.searchParams.set("lapsed_days", String(lapsedDays)); const res = await fetch(url, { headers: { "X-Recap-Operator-Key": operatorKey }, signal: AbortSignal.timeout(timeoutMs), }); if (!res.ok) return null; return await res.json(); } catch { return null; } } // Streams a file off disk into a Blob with the given MIME type for // FormData upload. Node's global Blob/File don't accept a stream // directly the way browser File objects do, so we read into a Buffer // here. Acceptable for typical podcast chunk sizes (~50 MB at most // after the orchestration layer's 45-min split). async function fileToBlob(filePath, mimeType) { const { promises: fsp } = await import("fs"); const buf = await fsp.readFile(filePath); return new Blob([buf], { type: mimeType || "application/octet-stream" }); }