Files
recap-relay/server/routes/transcribe-url.js
T

416 lines
14 KiB
JavaScript

// POST /relay/transcribe-url — like /relay/transcribe but the relay
// fetches the audio itself instead of accepting it in the request
// body. Saves the buyer's upstream-bandwidth bottleneck: a 100-MB
// podcast that takes 60s to upload from a home connection takes <5s
// for the operator's relay (typically on a Start9 server with much
// fatter pipe) to download from the original source.
//
// Request body (application/json):
// {
// media_url: string, // YouTube URL OR direct audio (.mp3 / .m4a / etc.)
// type?: "youtube" | "podcast", // hint; we sniff URL shape if absent
// mime_type?: string, // hint for the transcribe backend; we sniff from
// // the downloaded file if absent
// title?: string, // metadata for Gemini's transcription prompt
// channel?: string,
// description?: string,
// chapters?: any[]
// }
//
// Same auth as /relay/transcribe (X-Recap-Install-Id required,
// X-Recap-Job-Id optional, Authorization optional Bearer license).
// Same standard envelope on response. Same job-id dedup + credit
// accounting + audit log. Adds `download_ms` to the audit row so
// dashboard can show how long the relay's own download took
// separately from the backend's inference time.
import express from "express";
import fs from "fs/promises";
import { createWriteStream } from "fs";
import os from "os";
import path from "path";
import { execFile } from "child_process";
import { promisify } from "util";
import { Readable } from "stream";
import { pipeline } from "stream/promises";
import { resolveLicense } from "../keysat-client.js";
import { getOrCreateRow, planBackend, commitCredit } from "../credits.js";
import { lookupJob, markJobCharged, refundJob } from "../job-credits.js";
import { getConfigSnapshot, getTierQuotas } from "../config.js";
import { createGeminiBackend } from "../backends/gemini.js";
import { createHardwareBackend } from "../backends/hardware.js";
import { envelope, errorEnvelope } from "./envelope.js";
import { recordCall } from "../audit-log.js";
import { calcGeminiCost } from "../pricing.js";
const execFileAsync = promisify(execFile);
// Max file size the relay is willing to download. Generous enough for
// 4-hour podcasts at ~256 kbps but caps DOS exposure.
const MAX_DOWNLOAD_BYTES = 500 * 1024 * 1024;
// Per-request safety timeout on the download leg alone (separate from
// the transcribe call's own timeout). yt-dlp can be slow when YouTube
// rate-limits; a hard ceiling avoids holding the request open forever.
const DOWNLOAD_TIMEOUT_MS = 10 * 60 * 1000;
function looksLikeYouTube(url) {
if (!url) return false;
return /(?:^|\.)(youtube\.com|youtu\.be)\b/i.test(url);
}
function guessMimeFromExt(filePath) {
const ext = path.extname(filePath).toLowerCase().replace(/^\./, "");
return (
{
mp3: "audio/mpeg",
m4a: "audio/mp4",
mp4: "audio/mp4",
aac: "audio/aac",
ogg: "audio/ogg",
opus: "audio/opus",
wav: "audio/wav",
webm: "audio/webm",
flac: "audio/flac",
}[ext] || "audio/mpeg"
);
}
// Download an HTTP(S) audio URL to a temp file. Stops if the file
// would exceed MAX_DOWNLOAD_BYTES. Returns { filePath, bytes,
// mimeType }.
async function downloadDirect(url, tmpDir) {
const res = await fetch(url, {
redirect: "follow",
signal: AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS),
});
if (!res.ok) {
throw new Error(`Download ${url} returned HTTP ${res.status}`);
}
const contentType = res.headers.get("content-type") || "";
const isAudioy =
contentType.startsWith("audio/") ||
contentType === "application/octet-stream" ||
contentType.includes("mpeg") ||
contentType.includes("mp4");
if (!isAudioy) {
// Don't enforce strictly — some podcast CDNs serve audio with
// generic content-types. Log + continue; the transcription backend
// will reject if it's truly not audio.
console.warn(
`[transcribe-url] non-audio content-type "${contentType}" for ${url} — proceeding anyway`
);
}
const ext =
contentType.includes("mp4") ? "m4a" :
contentType.includes("ogg") ? "ogg" :
contentType.includes("opus") ? "opus" :
"mp3";
const filePath = path.join(tmpDir, `audio.${ext}`);
// Stream the response body into the temp file with a running byte
// count so we can abort if it gets too large mid-download.
if (!res.body) throw new Error("response has no body");
let bytes = 0;
const out = createWriteStream(filePath);
const counted = new ReadableStream({
async start(controller) {
const reader = res.body.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
bytes += value.byteLength;
if (bytes > MAX_DOWNLOAD_BYTES) {
controller.error(
new Error(
`Download exceeded ${MAX_DOWNLOAD_BYTES} bytes — refusing to continue`
)
);
return;
}
controller.enqueue(value);
}
controller.close();
} catch (err) {
controller.error(err);
}
},
});
await pipeline(Readable.fromWeb(counted), out);
return { filePath, bytes, mimeType: contentType || guessMimeFromExt(filePath) };
}
// Download a YouTube URL via yt-dlp. Picks the audio-only m4a/mp3.
// Logs the chosen path back as the file. Caller manages tmpDir.
async function downloadYouTube(url, tmpDir) {
const outTemplate = path.join(tmpDir, "audio.%(ext)s");
const args = [
"-x", // extract audio
"--audio-format",
"mp3",
"--audio-quality",
"5",
"-o",
outTemplate,
"--no-playlist",
"--no-simulate",
"--no-warnings",
url,
];
try {
await execFileAsync("yt-dlp", args, {
timeout: DOWNLOAD_TIMEOUT_MS,
maxBuffer: 10 * 1024 * 1024,
});
} catch (err) {
const stderr = (err?.stderr || "").toString();
const stdout = (err?.stdout || "").toString();
throw new Error(
`yt-dlp failed: ${stderr.trim() || stdout.trim() || err?.message}`
);
}
// Find the produced file — yt-dlp's audio-format=mp3 means it ends
// up at audio.mp3, but be defensive in case it landed at a
// different extension.
const files = await fs.readdir(tmpDir);
const audioFile = files.find((f) => /^audio\.(mp3|m4a|opus|webm|aac|ogg)$/i.test(f));
if (!audioFile) {
throw new Error(`yt-dlp ran but no audio file found in ${tmpDir}`);
}
const filePath = path.join(tmpDir, audioFile);
const stat = await fs.stat(filePath);
if (stat.size > MAX_DOWNLOAD_BYTES) {
throw new Error(
`YouTube download exceeded ${MAX_DOWNLOAD_BYTES} bytes — refusing to continue`
);
}
return {
filePath,
bytes: stat.size,
mimeType: guessMimeFromExt(filePath),
};
}
export function transcribeUrlRouter() {
const router = express.Router();
router.post("/transcribe-url", express.json({ limit: "1mb" }), async (req, res) => {
const t0 = Date.now();
const installId = req.header("X-Recap-Install-Id");
const jobId = req.header("X-Recap-Job-Id") || null;
const auth = req.header("Authorization");
if (!installId) {
const e = await errorEnvelope({
error: "missing X-Recap-Install-Id header",
statusHint: 400,
});
return res.status(400).json(e.body);
}
const {
media_url: mediaUrl,
type,
mime_type: bodyMime,
title,
channel,
description,
chapters,
} = req.body || {};
if (!mediaUrl || typeof mediaUrl !== "string") {
const e = await errorEnvelope({
error: "missing or non-string body.media_url",
installId,
statusHint: 400,
});
return res.status(400).json(e.body);
}
const license = await resolveLicense(auth);
const tier = license.tier;
const row = await getOrCreateRow(installId);
row.tier_snapshot = tier;
// Quota check + backend choice. Same as /relay/transcribe.
let reusedJob = false;
let chosenBackend = null;
const existingJob = lookupJob(installId, jobId);
if (existingJob) {
reusedJob = true;
chosenBackend = existingJob.backend;
} else {
const cfg = await getConfigSnapshot();
const hasHardware = !!cfg.relay_parakeet_base_url;
const quota = await getTierQuotas();
const preference =
cfg.relay_transcribe_backend_preference || "gemini_first";
const plan = planBackend(row, quota, { hasHardware, preference });
if (!plan.allowed) {
await recordCall({
install_id: installId,
tier,
pipeline: "transcribe",
backend: null,
model: null,
status: "refused",
credit_charged: 0,
duration_ms: Date.now() - t0,
cost_usd: 0,
job_id: jobId,
error: plan.reason,
});
const e = await errorEnvelope({
error: plan.reason,
installId,
tier,
statusHint: 402,
});
return res.status(402).json(e.body);
}
chosenBackend = plan.backend;
}
// ── Download phase ─────────────────────────────────────────────
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "relay-dl-"));
const isYT = type === "youtube" || (!type && looksLikeYouTube(mediaUrl));
const dlStart = Date.now();
let audio;
let downloadMs = 0;
try {
audio = isYT
? await downloadYouTube(mediaUrl, tmpDir)
: await downloadDirect(mediaUrl, tmpDir);
downloadMs = Date.now() - dlStart;
console.log(
`[transcribe-url] downloaded ${audio.bytes} bytes from ${isYT ? "youtube" : "direct"} in ${downloadMs}ms (${mediaUrl.slice(0, 80)})`
);
} catch (err) {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
console.error(`[transcribe-url] download failed: ${err?.message || err}`);
await recordCall({
install_id: installId,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model: null,
status: "error",
credit_charged: 0,
duration_ms: Date.now() - t0,
download_ms: Date.now() - dlStart,
cost_usd: 0,
job_id: jobId,
error: ("download_failed: " + (err?.message || String(err))).slice(0, 200),
});
const e = await errorEnvelope({
error: "download_failed: " + (err?.message || String(err)).slice(0, 200),
installId,
tier,
statusHint: 502,
});
return res.status(502).json(e.body);
}
// ── Transcription phase ────────────────────────────────────────
const cfg = await getConfigSnapshot();
let result;
try {
const audioBuf = await fs.readFile(audio.filePath);
const mimeType = bodyMime || audio.mimeType;
if (chosenBackend === "gemini") {
const backend = createGeminiBackend({
apiKey: cfg.relay_gemini_api_key,
transcriptionModel: cfg.relay_gemini_transcription_model,
analysisModel: cfg.relay_gemini_analysis_model,
});
result = await backend.transcribeAudio({
audio: audioBuf,
mimeType,
title: title || "",
channel: channel || "",
description: description || "",
chapters: Array.isArray(chapters) ? chapters : [],
offsetSeconds: 0,
});
} else {
const backend = createHardwareBackend({
parakeetBaseURL: cfg.relay_parakeet_base_url,
gemmaBaseURL: cfg.relay_gemma_base_url,
parakeetModel: cfg.relay_parakeet_model,
gemmaModel: cfg.relay_gemma_model,
});
result = await backend.transcribeAudio({
audio: audioBuf,
mimeType,
offsetSeconds: 0,
});
}
} catch (err) {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
if (reusedJob) refundJob(installId, jobId);
console.error(`[transcribe-url] transcribe failed: ${err?.message}`);
await recordCall({
install_id: installId,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model:
chosenBackend === "gemini"
? cfg.relay_gemini_transcription_model
: cfg.relay_parakeet_model,
status: "error",
credit_charged: 0,
duration_ms: Date.now() - t0,
download_ms: downloadMs,
cost_usd: 0,
job_id: jobId,
error: (err?.message || String(err)).slice(0, 200),
});
const e = await errorEnvelope({
error: err?.message || "backend_error",
installId,
tier,
statusHint: err?.status || 502,
});
return res.status(e.statusHint).json(e.body);
} finally {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
}
// ── Commit + audit ─────────────────────────────────────────────
let creditCharged = 0;
if (!reusedJob) {
await commitCredit(installId, { backend: chosenBackend, tier });
markJobCharged(installId, jobId, { backend: chosenBackend, tier });
creditCharged = 1;
}
const costDetails =
chosenBackend === "gemini" && result.usage
? calcGeminiCost(result.model, result.usage)
: {
input_tokens: 0,
output_tokens: 0,
thinking_tokens: 0,
cost_usd: 0,
};
await recordCall({
install_id: installId,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model: result?.model || null,
status: "success",
credit_charged: creditCharged,
duration_ms: Date.now() - t0,
download_ms: downloadMs,
audio_bytes: audio.bytes,
job_id: jobId,
...costDetails,
});
const body = await envelope({ result, installId, tier, creditCharged });
res.json(body);
});
return router;
}