// POST /relay/transcribe-url — like /relay/transcribe but the relay // fetches the audio itself instead of accepting it in the request // body. Saves the buyer's upstream-bandwidth bottleneck: a 100-MB // podcast that takes 60s to upload from a home connection takes <5s // for the operator's relay (typically on a Start9 server with much // fatter pipe) to download from the original source. // // Request body (application/json): // { // media_url: string, // YouTube URL OR direct audio (.mp3 / .m4a / etc.) // type?: "youtube" | "podcast", // hint; we sniff URL shape if absent // mime_type?: string, // hint for the transcribe backend; we sniff from // // the downloaded file if absent // title?: string, // metadata for Gemini's transcription prompt // channel?: string, // description?: string, // chapters?: any[] // } // // Same auth as /relay/transcribe (X-Recap-Install-Id required, // X-Recap-Job-Id optional, Authorization optional Bearer license). // Same standard envelope on response. Same job-id dedup + credit // accounting + audit log. Adds `download_ms` to the audit row so // dashboard can show how long the relay's own download took // separately from the backend's inference time. import express from "express"; import fs from "fs/promises"; import { createWriteStream } from "fs"; import os from "os"; import path from "path"; import { execFile } from "child_process"; import { promisify } from "util"; import { Readable } from "stream"; import { pipeline } from "stream/promises"; import { resolveIdentity, identityTier } from "../identity.js"; import { getOrCreateRow, planBackend, commitCredit, licenseFingerprint, } from "../credits.js"; import { lookupJob, markJobCharged, refundJob } from "../job-credits.js"; import { getConfigSnapshot, getTierQuotas } from "../config.js"; import { createGeminiBackend } from "../backends/gemini.js"; import { createHardwareBackend } from "../backends/hardware.js"; import { envelope, errorEnvelope } from "./envelope.js"; import { recordCall } from "../audit-log.js"; import { calcGeminiCost } from "../pricing.js"; import { getAudioDurationSeconds } from "../audio-meta.js"; import { resolveHardwareConfig } from "../hardware-config.js"; import { reportHealthEvent } from "../spark-control-events.js"; import { createJob, markRunning, setProgress, markComplete, markFailed, getJob, } from "../jobs.js"; import { saveJobOutput } from "../output-store.js"; const execFileAsync = promisify(execFile); // Max file size the relay is willing to download. Generous enough for // 4-hour podcasts at ~256 kbps but caps DOS exposure. const MAX_DOWNLOAD_BYTES = 500 * 1024 * 1024; // Per-request safety timeout on the download leg alone (separate from // the transcribe call's own timeout). yt-dlp can be slow when YouTube // rate-limits; a hard ceiling avoids holding the request open forever. const DOWNLOAD_TIMEOUT_MS = 10 * 60 * 1000; export function looksLikeYouTube(url) { if (!url) return false; return /(?:^|\.)(youtube\.com|youtu\.be)\b/i.test(url); } function guessMimeFromExt(filePath) { const ext = path.extname(filePath).toLowerCase().replace(/^\./, ""); return ( { mp3: "audio/mpeg", m4a: "audio/mp4", mp4: "audio/mp4", aac: "audio/aac", ogg: "audio/ogg", opus: "audio/opus", wav: "audio/wav", webm: "audio/webm", flac: "audio/flac", }[ext] || "audio/mpeg" ); } // Download an HTTP(S) audio URL to a temp file. Stops if the file // would exceed MAX_DOWNLOAD_BYTES. Returns { filePath, bytes, // mimeType }. export async function downloadDirect(url, tmpDir) { const res = await fetch(url, { redirect: "follow", signal: AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS), }); if (!res.ok) { throw new Error(`Download ${url} returned HTTP ${res.status}`); } const contentType = res.headers.get("content-type") || ""; const isAudioy = contentType.startsWith("audio/") || contentType === "application/octet-stream" || contentType.includes("mpeg") || contentType.includes("mp4"); if (!isAudioy) { // Don't enforce strictly — some podcast CDNs serve audio with // generic content-types. Log + continue; the transcription backend // will reject if it's truly not audio. console.warn( `[transcribe-url] non-audio content-type "${contentType}" for ${url} — proceeding anyway` ); } const ext = contentType.includes("mp4") ? "m4a" : contentType.includes("ogg") ? "ogg" : contentType.includes("opus") ? "opus" : "mp3"; const filePath = path.join(tmpDir, `audio.${ext}`); // Stream the response body into the temp file with a running byte // count so we can abort if it gets too large mid-download. if (!res.body) throw new Error("response has no body"); let bytes = 0; const out = createWriteStream(filePath); const counted = new ReadableStream({ async start(controller) { const reader = res.body.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) break; bytes += value.byteLength; if (bytes > MAX_DOWNLOAD_BYTES) { controller.error( new Error( `Download exceeded ${MAX_DOWNLOAD_BYTES} bytes — refusing to continue` ) ); return; } controller.enqueue(value); } controller.close(); } catch (err) { controller.error(err); } }, }); await pipeline(Readable.fromWeb(counted), out); return { filePath, bytes, mimeType: contentType || guessMimeFromExt(filePath) }; } // Download a YouTube URL via yt-dlp. Picks the audio-only m4a/mp3. // Logs the chosen path back as the file. Caller manages tmpDir. // Captures the video title via `--print "%(title)s"` so callers (the // summarize-url / transcribe-url workers) can stamp the Jobs table // with the real title instead of "Untitled" when the client didn't // pre-fetch metadata. export async function downloadYouTube(url, tmpDir) { const outTemplate = path.join(tmpDir, "audio.%(ext)s"); const args = [ "-x", // extract audio "--audio-format", "mp3", "--audio-quality", "5", "-o", outTemplate, "--no-playlist", "--no-simulate", "--no-warnings", // Emit a JSON dict containing the full metadata we care about for // the transcribe prompt's speaker-identification cues. Using // `before_dl:` so we get the metadata even if the download itself // later fails partway. The `.{field1,field2}j` template prints // just the named fields as a JSON object (yt-dlp escapes embedded // newlines inside description values, so single-line stdout parses // cleanly). Title comes from the same dict — no second --print // needed. // // Why these four fields specifically: they\'re exactly what the // recap-app\'s fetchYouTubeMetadata() pulls and feeds into its // direct-to-Gemini transcribe prompt. With these populated, the // model can correctly assign speaker labels (host name from // channel, guest name from description, chapter titles often name // both). Without them, every transcript falls back to unlabeled // dialogue regardless of how detailed the prompt\'s // speaker-identification rule is. "--print", "before_dl:%(.{title,channel,description,chapters})j", url, ]; let extractedMetadata = { title: null, channel: null, description: null, chapters: [], }; try { const { stdout } = await execFileAsync("yt-dlp", args, { timeout: DOWNLOAD_TIMEOUT_MS, maxBuffer: 10 * 1024 * 1024, }); // The JSON dict is the first non-empty line that starts with `{`. // yt-dlp may print other progress / warning lines before or after // depending on version; filter to the JSON line specifically. const firstJsonLine = (stdout || "") .split(/\r?\n/) .map((l) => l.trim()) .find((l) => l.length > 0 && l.startsWith("{")); if (firstJsonLine) { try { const parsed = JSON.parse(firstJsonLine); extractedMetadata = { title: typeof parsed.title === "string" && parsed.title.trim() ? parsed.title.trim().slice(0, 300) : null, channel: typeof parsed.channel === "string" && parsed.channel.trim() ? parsed.channel.trim().slice(0, 200) : null, // Cap at 2000 chars — recap-app uses the same cap. Long // descriptions with release-notes / sponsor blocks otherwise // bloat the prompt and crowd out the speaker-naming signal. description: typeof parsed.description === "string" && parsed.description.trim() ? parsed.description.trim().slice(0, 2000) : null, // Each chapter is { start_time: seconds, end_time, title }. // We only use start_time + title in the prompt; pass the full // array through so callers see what yt-dlp returned. chapters: Array.isArray(parsed.chapters) ? parsed.chapters : [], }; } catch (parseErr) { // Malformed JSON from yt-dlp. Fall back to title-only via a // best-effort regex on the line. Better than nothing. const m = firstJsonLine.match(/"title"\s*:\s*"([^"]+)"/); if (m) extractedMetadata.title = m[1].slice(0, 300); console.warn( `[yt-dlp] metadata JSON parse failed: ${parseErr?.message || parseErr} — falling back to title-only` ); } } else if (stdout) { // No JSON line but stdout has something — older yt-dlp versions // or some videos may emit a bare title line. Use it as title-only // so we at least preserve the existing v0.2.56 behavior. const firstLine = stdout .split(/\r?\n/) .map((l) => l.trim()) .find((l) => l.length > 0); if (firstLine) extractedMetadata.title = firstLine.slice(0, 300); } } catch (err) { const stderr = (err?.stderr || "").toString(); const stdoutStr = (err?.stdout || "").toString(); throw new Error( `yt-dlp failed: ${stderr.trim() || stdoutStr.trim() || err?.message}` ); } // Find the produced file — yt-dlp's audio-format=mp3 means it ends // up at audio.mp3, but be defensive in case it landed at a // different extension. const files = await fs.readdir(tmpDir); const audioFile = files.find((f) => /^audio\.(mp3|m4a|opus|webm|aac|ogg)$/i.test(f)); if (!audioFile) { throw new Error(`yt-dlp ran but no audio file found in ${tmpDir}`); } const filePath = path.join(tmpDir, audioFile); const stat = await fs.stat(filePath); if (stat.size > MAX_DOWNLOAD_BYTES) { throw new Error( `YouTube download exceeded ${MAX_DOWNLOAD_BYTES} bytes — refusing to continue` ); } return { filePath, bytes: stat.size, mimeType: guessMimeFromExt(filePath), title: extractedMetadata.title, channel: extractedMetadata.channel, description: extractedMetadata.description, chapters: extractedMetadata.chapters, }; } export function transcribeUrlRouter() { const router = express.Router(); // POST /relay/transcribe-url — kicks off a background transcribe // job and returns immediately with { job_id }. The client polls // GET /relay/jobs/:id to find out when it's done. // // Why async: a synchronous response over HTTP can't reliably // survive multi-minute work — proxies, load balancers, and NATs // along the path will drop the connection on long-running idle // requests (we observed a 5-minute cut on a 1h45m transcribe). // The poll requests are short and cheap, so they never trip // timeouts. router.post("/transcribe-url", express.json({ limit: "1mb" }), async (req, res) => { const summaryJobId = req.header("X-Recap-Job-Id") || null; let identity; try { identity = await resolveIdentity(req); } catch (err) { const e = await errorEnvelope({ error: err?.message || "auth_error", statusHint: err?.status || 401, }); return res.status(e.statusHint || 401).json(e.body); } if (identity.kind === "license" && !identity.installId) { const e = await errorEnvelope({ error: "missing X-Recap-Install-Id header", statusHint: 400, }); return res.status(400).json(e.body); } const { creditKey, installId, license } = identity; // `title` is `let` rather than `const` because the worker may // backfill it from yt-dlp metadata after the download completes // (when the client didn't pre-fetch the title). let title; const { media_url: mediaUrl, type, mime_type: bodyMime, title: bodyTitle, channel, description, chapters, } = req.body || {}; title = bodyTitle; if (!mediaUrl || typeof mediaUrl !== "string") { const e = await errorEnvelope({ error: "missing or non-string body.media_url", creditKey, installId, statusHint: 400, }); return res.status(400).json(e.body); } const row = await getOrCreateRow({ creditKey, installId, license }); const tier = identityTier(identity, row); row.tier_snapshot = tier; const licenseFp = identity.kind === "cloud" ? null : licenseFingerprint(license); const auditInstall = installId || identity.userId || null; // Billing vs. routing decoupled — see analyze.js for reasoning. const reusedSummaryJob = !!lookupJob({ creditKey, installId, license, jobId: summaryJobId }); const cfgPlan = await getConfigSnapshot(); const hw = await resolveHardwareConfig(cfgPlan); // Operator-only diagnostic — see summarize-url.js for the full // reasoning. We don't 503 here on blocked_reason because doing // so pre-empts planBackend and would surface operator-internal // Spark Control / parakeet wording to clients even when Gemini // was the configured preference. if (hw.transcribe.blocked_reason) { console.warn( `[transcribe-url] hardware transcribe currently blocked (planBackend will route to Gemini if available): ${hw.transcribe.blocked_reason}`, ); } const hasHardware = !!hw.transcribe.url; const quota = await getTierQuotas(); const preference = cfgPlan.relay_transcribe_backend_preference || "gemini_first"; const plan = planBackend(row, quota, { hasHardware, preference }); if (!plan.allowed) { await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: null, model: null, status: "refused", credit_charged: 0, duration_ms: 0, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: plan.reason, }); const e = await errorEnvelope({ error: plan.reason, installId, license, tier, statusHint: 402, }); return res.status(402).json(e.body); } const chosenBackend = plan.backend; // Mint the background job + RESPOND IMMEDIATELY. const job = createJob({ kind: "transcribe-url", installId: auditInstall, metadata: { owner: creditKey, // authorizes the /jobs/:id poll (per-identity) media_url: mediaUrl, backend: chosenBackend, summary_job_id: summaryJobId, }, }); // Background worker — runs after this handler has returned. // Errors are captured into the job record; nothing thrown here // can crash the route process. (async () => { const workerT0 = Date.now(); markRunning(job.id); setProgress(job.id, "downloading media…"); const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "relay-dl-")); const isYT = type === "youtube" || (!type && looksLikeYouTube(mediaUrl)); let audio; let downloadMs = 0; try { const dlStart = Date.now(); audio = isYT ? await downloadYouTube(mediaUrl, tmpDir) : await downloadDirect(mediaUrl, tmpDir); downloadMs = Date.now() - dlStart; console.log( `[transcribe-url ${job.id.slice(0, 8)}] downloaded ${audio.bytes} bytes from ${isYT ? "youtube" : "direct"} in ${downloadMs}ms` ); audio.seconds = await getAudioDurationSeconds(audio.filePath); if (!title && audio.title) { // yt-dlp captured the title during download; use it when // the client didn't pass one. title = audio.title; } setProgress(job.id, `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`); } catch (err) { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} const msg = (err?.message || String(err)).slice(0, 300); console.error(`[transcribe-url ${job.id.slice(0, 8)}] download failed: ${msg}`); await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: null, status: "error", credit_charged: 0, duration_ms: Date.now() - workerT0, download_ms: Date.now() - workerT0, audio_seconds: null, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: "download_failed: " + msg, }); markFailed(job.id, "download_failed: " + msg); return; } // Transcription phase const cfg = await getConfigSnapshot(); let result; // Stamp the moment transcribe is about to start (AFTER download // finished). Used for duration_ms on the audit row so the // "TX wall time" column reflects ONLY the transcribe phase. const txPhaseStart = Date.now(); try { const audioBuf = await fs.readFile(audio.filePath); const mimeType = bodyMime || audio.mimeType; if (chosenBackend === "gemini") { const backend = createGeminiBackend({ apiKey: cfg.relay_gemini_api_key, transcriptionModel: cfg.relay_gemini_transcription_model, analysisModel: cfg.relay_gemini_analysis_model, txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60, txConcurrency: cfg.relay_gemini_tx_concurrency || 12, transcribePromptOverride: cfg.relay_transcribe_prompt || "", }); result = await backend.transcribeAudio({ audio: audioBuf, mimeType, title: title || "", channel: channel || "", description: description || "", chapters: Array.isArray(chapters) ? chapters : [], offsetSeconds: 0, }); } else { const backend = createHardwareBackend({ parakeetBaseURL: hw.transcribe.url || "", gemmaBaseURL: hw.analyze.url || "", sparkControlBaseURL: hw.sparkBase || "", parakeetModel: hw.transcribe.model || "", gemmaModel: hw.analyze.model || "", txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60, txChunkOverlapSeconds: cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30, diarizationEnabled: !!cfg.relay_hardware_diarization_enabled, clusterThresholdPct: cfg.relay_hardware_voice_clustering_threshold ?? 70, anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30, smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15, uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10, txConcurrency: cfg.relay_hardware_tx_concurrency || 4, }); result = await backend.transcribeAudio({ audio: audioBuf, mimeType, offsetSeconds: 0, }); } } catch (err) { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId }); const msg = (err?.message || String(err)).slice(0, 400); console.error(`[transcribe-url ${job.id.slice(0, 8)}] transcribe failed: ${msg}`); if (chosenBackend === "hardware") { reportHealthEvent({ service: "parakeet", ok: false, error: msg.slice(0, 280), ms: Date.now() - workerT0, }); } await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: chosenBackend === "gemini" ? cfg.relay_gemini_transcription_model : hw.transcribe.model || "(auto)", status: "error", credit_charged: 0, duration_ms: Date.now() - txPhaseStart, download_ms: downloadMs, audio_seconds: audio?.seconds || null, audio_bytes: audio?.bytes || null, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: msg, }); markFailed(job.id, msg); return; } finally { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} } // Success — commit credit (once per summary job_id), audit, mark done. let creditCharged = 0; if (!reusedSummaryJob) { await commitCredit({ creditKey, installId, license, backend: chosenBackend, tier }); await markJobCharged({ creditKey, installId, license, jobId: summaryJobId, backend: chosenBackend, tier }); creditCharged = 1; } const costDetails = chosenBackend === "gemini" && result.usage ? calcGeminiCost(result.model, result.usage) : { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0, }; // Truncation detection — mark partial when any chunk hit // the silent output-token cap and emitted < 80% of its // expected audio. See gemini.js for the actual coverage // computation; here we just propagate to the audit row. const truncatedChunks = Array.isArray(result?.truncated_chunks) ? result.truncated_chunks : []; const wasTruncated = truncatedChunks.length > 0; const truncationError = wasTruncated ? `transcribe: ${truncatedChunks.length} chunk(s) truncated — missing ~${truncatedChunks.reduce((s, c) => s + (c.missingSec || 0), 0)}s of speech (model: ${result.model || "unknown"}). Likely hit maxOutputTokens.` : null; await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: result?.model || null, status: wasTruncated ? "partial" : "success", credit_charged: creditCharged, duration_ms: Date.now() - txPhaseStart, download_ms: downloadMs, audio_bytes: audio.bytes, audio_seconds: audio.seconds || null, job_id: summaryJobId, attempts: result?.attempts || null, // Per-job context for the operator dashboard's per-video table. // media_url + title let the dashboard show what was being // processed; chunk_count exposes the new server-side chunking // (1 for short audio, N for ≥30 min audio split by the Gemini // backend or by the hardware backend's Parakeet chunker). media_url: mediaUrl || null, title: title || null, chunk_count: result?.chunk_count ?? null, chunk_durations_ms: result?.chunk_durations_ms || null, truncated_chunks: wasTruncated ? truncatedChunks : null, error: truncationError, ...costDetails, }); markComplete(job.id, { result, credit_charged: creditCharged, tier, }); console.log( `[transcribe-url ${job.id.slice(0, 8)}] complete in ${((Date.now() - workerT0) / 1000).toFixed(1)}s` ); // Optional: persist transcript output for the operator's // "View output" dashboard feature. Only when the config flag // is set (default false) — saving real-user transcripts is an // opt-in operator decision, not a default. Note that we only // have the transcript here (analyze runs as a separate // /relay/analyze call in the Recap flow); the analyze row will // overwrite this file later with the full transcript+analysis // payload when it lands. Best-effort, errors ignored. if (cfg.relay_save_user_outputs) { await saveJobOutput(summaryJobId || job.id, { batch_id: null, source: null, transcript: result?.text || "", analysis: null, analysis_raw_text: null, meta: { title: title || null, media_url: mediaUrl, audio_seconds: audio.seconds || null, audio_bytes: audio.bytes, captions_mode: null, transcribe_backend: chosenBackend, transcribe_model: result?.model || null, analyze_backend: null, analyze_model: null, }, }); } })().catch((err) => { // Top-level catch — should be unreachable since the worker // handles its own try/catch, but defends against unexpected // throws so the job doesn't sit in "running" forever. markFailed(job.id, "worker_crashed: " + (err?.message || String(err))); console.error(`[transcribe-url ${job.id.slice(0, 8)}] worker crashed:`, err); }); // Hand back the job_id immediately. Client will poll for status. const body = await envelope({ result: { job_id: job.id, status: "queued", kind: "transcribe-url", }, creditKey, installId, license, tier, }); res.json(body); }); // GET /relay/jobs/:id — poll loop's friend. Install-id scoped so // job ids can't be enumerated cross-install. Returns the running // status + (once complete) the full transcribe result envelope. router.get("/jobs/:id", async (req, res) => { let identity; try { identity = await resolveIdentity(req); } catch (err) { const e = await errorEnvelope({ error: err?.message || "auth_error", statusHint: err?.status || 401 }); return res.status(e.statusHint || 401).json(e.body); } if (identity.kind === "license" && !identity.installId) { const e = await errorEnvelope({ error: "missing X-Recap-Install-Id header", statusHint: 400, }); return res.status(400).json(e.body); } const { creditKey, installId, license } = identity; const ownerRow = await getOrCreateRow({ creditKey, installId, license }); const tier = identityTier(identity, ownerRow); const jobId = (req.params.id || "").trim(); const job = getJob(jobId); if (!job) { const e = await errorEnvelope({ error: "job_not_found", creditKey, creditKey, installId, tier, statusHint: 404, }); return res.status(404).json(e.body); } // New jobs carry metadata.owner = creditKey; older jobs only carry // install_id. Authorize by whichever the job has. const ownerOk = job.metadata?.owner ? job.metadata.owner === creditKey : identity.installId && job.install_id === identity.installId; if (!ownerOk) { const e = await errorEnvelope({ error: "job_belongs_to_different_owner", creditKey, creditKey, installId, tier, statusHint: 403, }); return res.status(403).json(e.body); } const body = await envelope({ result: { job_id: job.id, kind: job.kind, status: job.status, progress: job.progress, started_at: job.started_at, updated_at: job.updated_at, completed_at: job.completed_at, // Include the FULL transcribe-result on completion so the // client doesn't need a second round-trip. result: job.status === "complete" ? job.result?.result : null, credit_charged: job.status === "complete" ? job.result?.credit_charged || 0 : 0, error: job.error, }, creditKey, installId, license, tier, }); res.json(body); }); return router; }