// POST /relay/summarize-url — unified transcribe + analyze pipeline. // // Why this exists: the older /relay/transcribe-url + /relay/analyze // pair required the Recap client to do its OWN analyze windowing // (per-window POST to /relay/analyze, then client-side stitch). // That round-trips the full transcript back to the client only for // the client to slice it into 12 windows and ship those slices back // to the relay, one per HTTP request. Bandwidth + latency tax on long // content, and the chunking knobs in the operator's Settings tab // only affected benchmarks — production traffic ignored them because // Recap had its own hardcoded constants. // // This route does both steps server-side in one async job: // 1. Download audio (yt-dlp / direct HTTP) — same code as transcribe-url // 2. Transcribe with chunking (Gemini or Hardware) — same backends // 3. Run chunked-analyze on the transcript using the operator's // Settings-tab knobs (window body / overlap / concurrency) // 4. Per-window section results stream to the client via SSE // 5. Final transcript + stitched analysis returned in the // complete job result // // Credit policy: charges ONE credit on full success (same as // transcribe-url alone — Recap's billing model treats one summarize // as one credit, regardless of which steps it touches). If transcribe // fails before analyze starts, no credit is charged. If transcribe // succeeds but ALL analyze windows fail, the credit IS charged (the // expensive part — transcribe — completed, and partial-window // failures are tolerated upstream of this). // // Endpoints exposed: // POST /relay/summarize-url → kick off, returns job_id // GET /relay/summarize-url/:jobId/events → SSE stream of progress + window events // GET /relay/jobs/:id (existing) → poll-based fallback import express from "express"; import fs from "fs/promises"; import os from "os"; import path from "path"; import { getConfigSnapshot } from "../config.js"; import { createGeminiBackend } from "../backends/gemini.js"; import { createHardwareBackend } from "../backends/hardware.js"; import { resolveHardwareConfig } from "../hardware-config.js"; import { recordCall } from "../audit-log.js"; import { calcGeminiCost } from "../pricing.js"; import { getAudioDurationSeconds } from "../audio-meta.js"; import { createJob, getJob, markRunning, setProgress, markComplete, markFailed, appendEvent, subscribeToJob, } from "../jobs.js"; import { resolveIdentity, identityTier } from "../identity.js"; import { getOrCreateRow, planBackend, commitCredit, licenseFingerprint, } from "../credits.js"; import { lookupJob, markJobCharged, refundJob } from "../job-credits.js"; import { getTierQuotas } from "../config.js"; import { envelope, errorEnvelope } from "./envelope.js"; import { runChunkedAnalysis, runPipelinedAnalysis, parseBracketedTranscript, firstEntryAtOrAfter, lastEntryBefore, canonicalIndexForOffset, stitchAnalysisResults, } from "../chunked-analyze.js"; import { createChunkBuffer } from "../chunk-buffer.js"; import { runNameInference, runSummaryPolish, } from "../post-cluster-polish.js"; import { acquireHardwareSlot } from "../hardware-queue.js"; import { saveJobOutput } from "../output-store.js"; import { looksLikeYouTube, downloadDirect, downloadYouTube, } from "./transcribe-url.js"; import { reportHealthEvent } from "../spark-control-events.js"; export function summarizeUrlRouter() { const router = express.Router(); // POST /relay/summarize-url — kicks off the combined pipeline. // Same request shape as /relay/transcribe-url (media_url + optional // hints). Same auth (X-Recap-Install-Id header + license proof). router.post("/summarize-url", express.json({ limit: "1mb" }), async (req, res) => { const summaryJobId = req.header("X-Recap-Job-Id") || null; let identity; try { identity = await resolveIdentity(req); } catch (err) { const e = await errorEnvelope({ error: err?.message || "auth_error", statusHint: err?.status || 401, }); return res.status(e.statusHint || 401).json(e.body); } if (identity.kind === "license" && !identity.installId) { const e = await errorEnvelope({ error: "missing X-Recap-Install-Id header", statusHint: 400, }); return res.status(400).json(e.body); } const { creditKey, installId, license } = identity; // `title` is `let` rather than `const` because the worker may // backfill it from yt-dlp metadata after the download completes // (when the client didn't pre-fetch the title — typical for the // Recap-app relay-mode branch which submits the URL alone). let title; // channel/description/chapters are declared with `let` (not const) // because the yt-dlp download step below may backfill any of them // when the client sent empty values — see the "Fall back to yt-dlp- // extracted metadata" block after downloadYouTube returns. let channel, description, chapters; const { media_url: mediaUrl, type, mime_type: bodyMime, title: bodyTitle, channel: bodyChannel, description: bodyDescription, chapters: bodyChapters, } = req.body || {}; channel = bodyChannel; description = bodyDescription; chapters = bodyChapters; // Treat empty strings and the literal sentinel "Untitled" as // "no title supplied" so the yt-dlp metadata fallback below // (`if (!title && audio.title) title = audio.title`) fires. // Older Recap clients (< 0.2.71) pass "Untitled" verbatim when // the operator hasn't pre-fetched metadata — without this // normalization the relay would echo "Untitled" back as if it // were a real title, and the resulting library entry would // stay "Untitled" forever despite yt-dlp having the real one. const trimmedBodyTitle = typeof bodyTitle === "string" ? bodyTitle.trim() : bodyTitle; title = (trimmedBodyTitle === "" || trimmedBodyTitle === "Untitled") ? null : trimmedBodyTitle; if (!mediaUrl || typeof mediaUrl !== "string") { const e = await errorEnvelope({ error: "missing or non-string body.media_url", creditKey, installId, statusHint: 400, }); return res.status(400).json(e.body); } const row = await getOrCreateRow({ creditKey, installId, license }); const tier = identityTier(identity, row); row.tier_snapshot = tier; const licenseFp = identity.kind === "cloud" ? null : licenseFingerprint(license); const auditInstall = installId || identity.userId || null; const reusedSummaryJob = !!lookupJob({ creditKey, installId, license, jobId: summaryJobId }); const cfgPlan = await getConfigSnapshot(); const hw = await resolveHardwareConfig(cfgPlan); // Operator-only diagnostic. We do NOT 503 the request here on // blocked_reason — that pre-empts the backend-preference choice // and would return a hardware-down error even when Gemini was // selected as the routing preference (it'd also leak the // operator-internal Spark Control / parakeet wording to the // client). Instead: log the detail for the operator's eyes, // let `hasHardware = !!hw.transcribe.url` go to false, and let // planBackend route to Gemini under the normal preference // rules. If the operator chose hardware_only mode, planBackend // refuses with the generic `hardware_only_not_configured` // reason — which is client-safe. if (hw.transcribe.blocked_reason) { console.warn( `[summarize-url] hardware transcribe currently blocked (planBackend will route to Gemini if available): ${hw.transcribe.blocked_reason}`, ); } if (hw.analyze.blocked_reason) { console.warn( `[summarize-url] hardware analyze currently blocked (planBackend will route to Gemini if available): ${hw.analyze.blocked_reason}`, ); } const hasHardware = !!hw.transcribe.url; const quota = await getTierQuotas(); const preference = cfgPlan.relay_transcribe_backend_preference || "gemini_first"; const plan = planBackend(row, quota, { hasHardware, preference }); if (!plan.allowed) { await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: null, model: null, status: "refused", credit_charged: 0, duration_ms: 0, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: plan.reason, }); const e = await errorEnvelope({ error: plan.reason, installId, license, tier, statusHint: 402, }); return res.status(402).json(e.body); } const chosenBackend = plan.backend; // The analyze backend follows the operator's analyze-side // preference, NOT the transcribe-side preference. They're set // independently in StartOS so the operator can route, e.g., // transcribe-to-Gemini + analyze-to-hardware on tight Gemini // budgets. planBackend evaluates the same quota/preference rules // for analyze and returns a separate decision. const analyzePreference = cfgPlan.relay_analyze_backend_preference || "gemini_first"; const hasAnalyzeHardware = !!hw.analyze.url; const analyzePlan = planBackend(row, quota, { hasHardware: hasAnalyzeHardware, preference: analyzePreference, }); if (!analyzePlan.allowed) { const e = await errorEnvelope({ error: analyzePlan.reason, installId, license, tier, statusHint: 402, }); return res.status(402).json(e.body); } const chosenAnalyzeBackend = analyzePlan.backend; const job = createJob({ kind: "summarize-url", installId: auditInstall, metadata: { owner: creditKey, // authorizes the SSE /events stream (per-identity) media_url: mediaUrl, transcribe_backend: chosenBackend, analyze_backend: chosenAnalyzeBackend, summary_job_id: summaryJobId, }, }); // Background worker — same try/catch discipline as transcribe-url. // All progress + per-window events go through appendEvent(job.id, ...) // so any SSE subscriber gets them in real time. (async () => { const workerT0 = Date.now(); markRunning(job.id); appendEvent(job.id, "progress", { message: "downloading media…" }); setProgress(job.id, "downloading media…"); console.log( `[summarize-url ${job.id.slice(0, 8)}] downloading media from ${(mediaUrl || "(no url)").slice(0, 120)}` ); const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "relay-sum-")); const isYT = type === "youtube" || (!type && looksLikeYouTube(mediaUrl)); let audio; let downloadMs = 0; try { const dlStart = Date.now(); audio = isYT ? await downloadYouTube(mediaUrl, tmpDir) : await downloadDirect(mediaUrl, tmpDir); downloadMs = Date.now() - dlStart; audio.seconds = await getAudioDurationSeconds(audio.filePath); console.log( `[summarize-url ${job.id.slice(0, 8)}] download done in ${(downloadMs / 1000).toFixed(1)}s — ${Math.round((audio.seconds || 0) / 60)} min audio` ); // Fall back to the yt-dlp-extracted title when the client // didn't pass one. Recap-app's relay-mode branch doesn't // pre-fetch YouTube metadata, so without this the Jobs table // shows "Untitled" for every Recap submission. if (!title && audio.title) { title = audio.title; } // Same fallback pattern for channel / description / chapters. // These power speaker-identification in the transcribe prompt // (the model uses them to assign names like "Brandon Karpeles:" // and "Matt Hill:" to dialogue turns). Recap-app's direct-to- // Gemini path always fetches these via fetchYouTubeMetadata // before transcribing; the relay path either gets them from // the Recap client (if Recap fetched them and passed them // through) OR — when the client sent empty fields — falls back // here to whatever yt-dlp extracted during download. Defense-in- // depth: works for ANY client that hits /relay/summarize-url, // not just Recap, and survives a client upgrade gap where an // older Recap is still sending empty channelHint/descriptionHint. if (!channel && audio.channel) { channel = audio.channel; } if (!description && audio.description) { description = audio.description; } if ((!Array.isArray(chapters) || chapters.length === 0) && Array.isArray(audio.chapters) && audio.chapters.length > 0) { chapters = audio.chapters; } appendEvent(job.id, "progress", { message: `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`, }); setProgress(job.id, `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`); } catch (err) { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} const msg = (err?.message || String(err)).slice(0, 300); await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: null, status: "error", credit_charged: 0, duration_ms: Date.now() - workerT0, download_ms: Date.now() - workerT0, audio_seconds: null, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: "download_failed: " + msg, }); markFailed(job.id, "download_failed: " + msg); return; } // ── Hardware FIFO queue acquire ───────────────────────────── // Hardware jobs serialize through a single-slot semaphore so // the operator's GPU box doesn't get N jobs hammering it at // once. Gemini-backend jobs bypass — Google's API handles // concurrency at scale. The acquire awaits if there's an // active hardware job ahead; we emit a "queued" SSE event so // Recap can show "Queued — N jobs ahead" while we wait. See // server/hardware-queue.js. const usesHardwareForAnything = chosenBackend === "hardware" || chosenAnalyzeBackend === "hardware"; let releaseHardwareSlot = null; if (usesHardwareForAnything) { releaseHardwareSlot = await acquireHardwareSlot({ jobId: job.id, onWait: ({ position, activeJobId }) => { // Tell the SSE client we're waiting so the UI can show // a queue position instead of a stalled "Transcribing…" // state. Best-effort — appendEvent is fire-and-forget. try { appendEvent(job.id, "queued", { position, activeJobId: activeJobId || null, }); setProgress( job.id, `queued — ${position} job(s) ahead on operator hardware` ); } catch {} console.log( `[summarize-url ${job.id.slice(0, 8)}] queued for hardware — position ${position}` ); }, }); } // Wrap from here to end-of-IIFE in try/finally so the hardware // slot ALWAYS releases — on success, on early-return failure, // and on uncaught exceptions. Finally executes before the // throw propagates so the next queued job unblocks promptly. try { // ── Transcribe ────────────────────────────────────────────── // Pipelined-analyze design (v0.2.89): when the hardware backend // is transcribing, fire each analyze window's call AS SOON AS // its required transcribe chunks have completed — in parallel // with later chunks still being transcribed. The total wall // time savings are modest (~10s for a 94-min video), but the // user-perceived "first topic visible" time drops from // ~T=160s to ~T=80s because window 1's sections render while // chunks 5-21 are still in-flight. See server/chunk-buffer.js // and runPipelinedAnalysis in chunked-analyze.js for the // mechanics. Gemini transcribe doesn't expose per-chunk // callbacks (single big call) so its path stays sequential. const cfg = await getConfigSnapshot(); let txResult; const txPhaseStart = Date.now(); // Build the analyze backend UP FRONT — we need it before // transcribe starts in pipelined mode. Sequential mode would // also work with it built here, just doesn't strictly need to. const anStart = Date.now(); let analyzeBackend = null; try { if (chosenAnalyzeBackend === "gemini") { analyzeBackend = createGeminiBackend({ apiKey: cfg.relay_gemini_api_key, transcriptionModel: cfg.relay_gemini_transcription_model, analysisModel: cfg.relay_gemini_analysis_model, txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60, txConcurrency: cfg.relay_gemini_tx_concurrency || 12, txMaxOutputTokens: cfg.relay_gemini_tx_max_output_tokens || 65536, anMaxOutputTokens: cfg.relay_gemini_an_max_output_tokens || 8192, }); } else { if (!hw.analyze.url) { throw new Error("hardware analyze URL not configured"); } analyzeBackend = createHardwareBackend({ parakeetBaseURL: hw.transcribe.url || "", gemmaBaseURL: hw.analyze.url, sparkControlBaseURL: hw.sparkBase || "", parakeetModel: hw.transcribe.model || "", gemmaModel: hw.analyze.model || "", txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60, txChunkOverlapSeconds: cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30, diarizationEnabled: !!cfg.relay_hardware_diarization_enabled, clusterThresholdPct: cfg.relay_hardware_voice_clustering_threshold ?? 70, anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30, smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15, uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10, txConcurrency: cfg.relay_hardware_tx_concurrency || 4, anMaxTokens: cfg.relay_hardware_an_max_tokens || 16000, }); } } catch (err) { const msg = (err?.message || String(err)).slice(0, 400); await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "analyze", backend: chosenAnalyzeBackend, model: null, status: "error", duration_ms: 0, audio_seconds: 0, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: "analyze_backend_init_failed: " + msg, window_idx: 0, window_count: 1, }); markFailed(job.id, "analyze_init_failed: " + msg); try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} return; } // Analyze tunables (used by both pipelined + sequential paths). const bodyMin = chosenAnalyzeBackend === "gemini" ? (cfg.relay_gemini_analyze_window_minutes || 18) : (cfg.relay_hardware_analyze_window_minutes || 18); const overlapMin = chosenAnalyzeBackend === "gemini" ? (cfg.relay_gemini_analyze_overlap_minutes || 2) : (cfg.relay_hardware_analyze_overlap_minutes || 2); const anConcurrency = chosenAnalyzeBackend === "gemini" ? (cfg.relay_gemini_analyze_concurrency || 12) : (cfg.relay_hardware_analyze_concurrency || 8); const cutoffMin = cfg.relay_analyze_cutoff_minutes || 25; const targetTotalsByBucket = { under_30: cfg.relay_analyze_total_sections_under_30, "30_60": cfg.relay_analyze_total_sections_30_60, "60_90": cfg.relay_analyze_total_sections_60_90, "90_120": cfg.relay_analyze_total_sections_90_120, "120_150": cfg.relay_analyze_total_sections_120_150, "150_180": cfg.relay_analyze_total_sections_150_180, over_180: cfg.relay_analyze_total_sections_over_180, }; const analyzePromptOverride = cfg.relay_analyze_prompt || cfg.relay_analyze_prompt_default || ""; const computeCostDetails = (model, usage) => chosenAnalyzeBackend === "gemini" && usage ? calcGeminiCost(model, usage) : { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 }; // Pipelined mode requires hardware transcribe (per-chunk // onChunkComplete callback) + a known audio duration. When both // hold, we start the analyze workers BEFORE the transcribe // promise so they're already waiting on chunkBuffer when the // first chunks land. const usePipelining = chosenBackend === "hardware" && (audio.seconds || 0) > 0; const chunkBuffer = usePipelining ? createChunkBuffer() : null; const onWindowCompleteHandler = (cb) => { appendEvent(job.id, "window_complete", { windowIdx: cb.windowIdx, totalWindows: cb.totalWindows, ownedSections: cb.ownedSections, // Pipelined mode: include this window's local entries so // the SDK / recap-app handler can render the partial // sections WITHOUT waiting for transcribe_complete to // populate streamedRelayEntries. The SDK falls back to // streamedRelayEntries when windowEntries is absent // (sequential mode). windowEntries: cb.windowEntries || undefined, windowBodySeconds: cb.windowBodySeconds, model: cb.model, durationMs: cb.durationMs, }); setProgress(job.id, `analyze window ${cb.windowIdx + 1}/${cb.totalWindows} done`); const sectionCount = Array.isArray(cb.ownedSections) ? cb.ownedSections.length : 0; console.log( `[summarize-url ${job.id.slice(0, 8)}] analyze window ${cb.windowIdx + 1}/${cb.totalWindows} done in ${((cb.durationMs || 0) / 1000).toFixed(1)}s (${sectionCount} sections, model=${cb.model || "?"})` ); }; // Kick off the pipelined-analyze promise BEFORE we await // transcribe. Workers block on chunkBuffer.waitForTime() until // their windows' required chunks land. If transcribe throws, // we call chunkBuffer.fail() in the catch to release the // pending waiters. let pipelinedAnalyzePromise = null; if (usePipelining) { pipelinedAnalyzePromise = runPipelinedAnalysis({ audioDurationSec: audio.seconds || 0, waitForTime: (sec) => chunkBuffer.waitForTime(sec), getReadySegments: (s, e) => chunkBuffer.getSegments(s, e), bodySeconds: bodyMin * 60, overlapSeconds: overlapMin * 60, cutoffSeconds: cutoffMin * 60, concurrency: anConcurrency, backend: analyzeBackend, pipelineBackend: chosenAnalyzeBackend, jobId: summaryJobId, batchId: null, mediaUrl, title: title || null, installId, licenseFingerprint: licenseFp, source: "summarize-url", computeCostDetails, analyzePromptOverride, targetTotalsByBucket, onWindowComplete: onWindowCompleteHandler, }).catch((err) => ({ __error: err })); } try { const audioBuf = await fs.readFile(audio.filePath); const mimeType = bodyMime || audio.mimeType; if (chosenBackend === "gemini") { const backend = createGeminiBackend({ apiKey: cfg.relay_gemini_api_key, transcriptionModel: cfg.relay_gemini_transcription_model, analysisModel: cfg.relay_gemini_analysis_model, txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60, txConcurrency: cfg.relay_gemini_tx_concurrency || 12, // Three-layer prompt resolution: per-session override // wins; operator-promoted default is next; gemini.js's // hardcoded DEFAULT_TRANSCRIBE_PROMPT_BODY is the // factory fallback. Empty string here means "fall back // to the code-side default" inside the backend. transcribePromptOverride: cfg.relay_transcribe_prompt || cfg.relay_transcribe_prompt_default || "", txMaxOutputTokens: cfg.relay_gemini_tx_max_output_tokens || 65536, anMaxOutputTokens: cfg.relay_gemini_an_max_output_tokens || 8192, }); txResult = await backend.transcribeAudio({ audio: audioBuf, mimeType, title: title || "", channel: channel || "", description: description || "", chapters: Array.isArray(chapters) ? chapters : [], offsetSeconds: 0, }); } else { const backend = createHardwareBackend({ parakeetBaseURL: hw.transcribe.url || "", gemmaBaseURL: hw.analyze.url || "", sparkControlBaseURL: hw.sparkBase || "", parakeetModel: hw.transcribe.model || "", gemmaModel: hw.analyze.model || "", txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60, txChunkOverlapSeconds: cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30, diarizationEnabled: !!cfg.relay_hardware_diarization_enabled, clusterThresholdPct: cfg.relay_hardware_voice_clustering_threshold ?? 70, anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30, smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15, uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10, txConcurrency: cfg.relay_hardware_tx_concurrency || 4, anMaxTokens: cfg.relay_hardware_an_max_tokens || 16000, // Pipelined-mode hook: feed each completed chunk into // the shared buffer so the analyze workers can unblock. onChunkComplete: chunkBuffer ? (cd) => chunkBuffer.add(cd) : null, }); txResult = await backend.transcribeAudio({ audio: audioBuf, mimeType, offsetSeconds: 0, }); } } catch (err) { if (chunkBuffer) chunkBuffer.fail(err); try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId }); const msg = (err?.message || String(err)).slice(0, 400); if (chosenBackend === "hardware") { reportHealthEvent({ service: "parakeet", ok: false, error: msg.slice(0, 280), ms: Date.now() - workerT0, }); } await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: chosenBackend === "gemini" ? cfg.relay_gemini_transcription_model : hw.transcribe.model || "(auto)", status: "error", credit_charged: 0, duration_ms: Date.now() - txPhaseStart, download_ms: downloadMs, audio_seconds: audio?.seconds || null, audio_bytes: audio?.bytes || null, cost_usd: 0, job_id: summaryJobId, media_url: mediaUrl || null, title: title || null, error: msg, }); markFailed(job.id, msg); return; } finally { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} } // Audit the transcribe row (success path). const txCostDetails = chosenBackend === "gemini" && txResult.usage ? calcGeminiCost(txResult.model, txResult.usage) : { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 }; // Detect output-token truncation. The backend returns a // truncated_chunks array when any chunk's last emitted // timestamp falls short of the chunk's expected duration — // a silent failure mode where the model hit its output cap // mid-transcript. We mark the audit row status="partial" // and stamp an error string so the Jobs table doesn't // mislead the operator into thinking the run succeeded. const truncatedChunks = Array.isArray(txResult?.truncated_chunks) ? txResult.truncated_chunks : []; const wasTruncated = truncatedChunks.length > 0; const truncationError = wasTruncated ? `transcribe: ${truncatedChunks.length} chunk(s) truncated — missing ~${truncatedChunks.reduce((s, c) => s + (c.missingSec || 0), 0)}s of speech (model: ${txResult.model || "unknown"}). Likely hit maxOutputTokens.` : null; await recordCall({ install_id: auditInstall, license_fingerprint: licenseFp, tier, pipeline: "transcribe", backend: chosenBackend, model: txResult?.model || null, status: wasTruncated ? "partial" : "success", credit_charged: 0, // credit committed AFTER analyze succeeds too // duration_ms = transcribe phase ONLY (txPhaseStart was // stamped after the download completed). Previously this used // Date.now() - workerT0 which included the download time too, // so the Jobs table's "TX wall time" column appeared inflated // by ~50s on long YouTube fetches. download_ms below carries // the download phase as a separate field so the operator can // see both numbers on the same row. duration_ms: Date.now() - txPhaseStart, download_ms: downloadMs, audio_bytes: audio.bytes, audio_seconds: audio.seconds || null, job_id: summaryJobId, attempts: txResult?.attempts || null, media_url: mediaUrl || null, title: title || null, chunk_count: txResult?.chunk_count ?? null, chunk_durations_ms: txResult?.chunk_durations_ms || null, truncated_chunks: wasTruncated ? truncatedChunks : null, error: truncationError, ...txCostDetails, }); // Emit a transcribe_complete event so the SSE client can show // the full transcript while analyze is still in flight (useful // for a "transcript preview" UI affordance). appendEvent(job.id, "transcribe_complete", { transcript: txResult.text || "", model: txResult.model || null, chunk_count: txResult.chunk_count ?? null, audio_seconds: audio.seconds || null, audio_bytes: audio.bytes || null, duration_ms: Date.now() - workerT0, }); setProgress(job.id, "analyzing topics…"); console.log( `[summarize-url ${job.id.slice(0, 8)}] transcribe done in ${((Date.now() - txPhaseStart) / 1000).toFixed(1)}s — starting analyze` ); // ── Analyze ───────────────────────────────────────────────── // Pipelined mode: await the promise we kicked off BEFORE // transcribe; remap window-local section indices to global // canonical indices; stitch. // Sequential mode: call runChunkedAnalysis on the full // transcript as today. let analyzeResult; try { if (pipelinedAnalyzePromise) { const pipelinedRaw = await pipelinedAnalyzePromise; if (pipelinedRaw && pipelinedRaw.__error) { throw pipelinedRaw.__error; } // Build canonical entries from the final stitched transcript. // The pipelined windows were each analyzed against their own // segment-only view; we now map their LOCAL section indices // to GLOBAL canonical-entry indices (which is what the // stitcher + downstream stitching code expects). const canonicalEntries = parseBracketedTranscript(txResult.text || ""); const syntheticResults = (pipelinedRaw.windowResults || []).map((wr) => { if (!wr || !wr.ok) return wr; const w = wr.window; const globalStartIdx = firstEntryAtOrAfter(canonicalEntries, w.startSec); const globalBodyStartIdx = firstEntryAtOrAfter(canonicalEntries, w.bodyStartSec); const globalEndIdx = lastEntryBefore(canonicalEntries, w.windowEndSec + 0.5); // Edge case: this window's time range produced no canonical // entries (audio gap, severe truncation). Mark as failed // so the stitcher skips it. if ( globalStartIdx >= canonicalEntries.length || globalEndIdx < globalStartIdx ) { return { window: { startIdx: 0, endIdx: -1, bodyStartIdx: 0 }, ok: false, error: new Error("pipelined window had no canonical entries — audio gap"), }; } // Remap each section's window-LOCAL indices to GLOBAL // canonical indices via time matching, then express as // window-local for the stitcher's offset math (which // adds window.startIdx to each section's startIndex). const remapped = []; for (const s of wr.sections || []) { const localStartEntry = wr.windowEntries?.[s.startIndex]; const localEndEntry = wr.windowEntries?.[s.endIndex]; if (!localStartEntry || !localEndEntry) continue; const globalStart = canonicalIndexForOffset( canonicalEntries, localStartEntry.offset || 0 ); const globalEnd = canonicalIndexForOffset( canonicalEntries, localEndEntry.offset || 0 ); if (globalStart < 0 || globalEnd < 0) continue; remapped.push({ startIndex: globalStart - globalStartIdx, endIndex: globalEnd - globalStartIdx, title: s.title, summary: s.summary, }); } return { window: { startIdx: globalStartIdx, endIdx: globalEndIdx, bodyStartIdx: globalBodyStartIdx, }, ok: true, sections: remapped, model: wr.model, }; }); const stitched = stitchAnalysisResults(syntheticResults); analyzeResult = { text: JSON.stringify({ sections: stitched }), model: pipelinedRaw.dominantModel, attempts: pipelinedRaw.attempts, usage: null, }; } else { analyzeResult = await runChunkedAnalysis({ transcriptText: txResult.text || "", backend: analyzeBackend, pipelineBackend: chosenAnalyzeBackend, jobId: summaryJobId, batchId: null, mediaUrl, title: title || null, installId, licenseFingerprint: licenseFp, source: "summarize-url", computeCostDetails, bodySeconds: bodyMin * 60, overlapSeconds: overlapMin * 60, concurrency: anConcurrency, cutoffSeconds: cutoffMin * 60, analyzePromptOverride, totalAudioSec: audio.seconds || 0, targetTotalsByBucket, onWindowComplete: ({ windowIdx, totalWindows, ownedSections, windowBodySeconds, model, durationMs }) => { appendEvent(job.id, "window_complete", { windowIdx, totalWindows, ownedSections, windowBodySeconds, model, durationMs, }); setProgress(job.id, `analyze window ${windowIdx + 1}/${totalWindows} done`); const sectionCount = Array.isArray(ownedSections) ? ownedSections.length : (typeof ownedSections === "number" ? ownedSections : 0); console.log( `[summarize-url ${job.id.slice(0, 8)}] analyze window ${windowIdx + 1}/${totalWindows} done in ${(durationMs / 1000).toFixed(1)}s (${sectionCount} sections, model=${model || "?"})` ); }, }); } } catch (err) { // All analyze windows failed (pipelined OR sequential). // Refund credit, mark job failed. if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId }); const msg = (err?.message || String(err)).slice(0, 400); markFailed(job.id, "analyze_failed: " + msg); return; } // ── Post-cluster polish (Phase 2) ─────────────────────────── // After diarization + clustering produced speaker IDs AND // analyze produced topic sections, run a two-stage LLM pass // that (1) infers real speaker names from the labeled // transcript + episode metadata, and (2) rewrites section // summaries to attribute statements to those speakers. // Skipped when: // - Operator disabled via Settings // - Diarization didn't run (no speakers to attribute) // - Fewer than 2 speakers detected (nothing to differentiate) // - Analyze had no sections to polish // // Failure handling: both stages are best-effort. If either // fails entirely, we keep the unpolished analyze output and // null speaker names. The user still gets a working result. let speakerNames = null; const polishEnabled = cfg.relay_post_cluster_polish_enabled !== false; const detectedSpeakerCount = Object.keys(txResult?.speakers || {}).length; const parsedAnalysisForPolish = tryParseJson(analyzeResult.text); const polishableSections = Array.isArray(parsedAnalysisForPolish?.sections) ? parsedAnalysisForPolish.sections : null; if ( polishEnabled && detectedSpeakerCount >= 2 && Array.isArray(txResult?.segments) && polishableSections && polishableSections.length > 0 ) { const polishStart = Date.now(); try { // Stage 1 — global speaker name inference. Single call, // returns map { Speaker_A: "Matt Hill", Speaker_B: null, ... }. speakerNames = await runNameInference({ speakers: txResult.speakers, transcriptSegments: txResult.segments, channelHint: channel || "", titleHint: title || "", descriptionHint: description || "", // Three-layer prompt resolution — per-session override // wins, then operator-promoted default, then the // hardcoded default inside post-cluster-polish.js. promptOverride: cfg.relay_polish_name_inference_prompt || cfg.relay_polish_name_inference_prompt_default || "", backend: analyzeBackend, pipelineBackend: chosenAnalyzeBackend, jobId: summaryJobId, batchId: null, mediaUrl, installId, licenseFingerprint: licenseFp, source: "summarize-url", computeCostDetails, }); // Stage 2 — per-window summary polish (parallel). Rewrites // section summaries to attribute statements to speakers. // Sections whose window's polish fails keep their // original summary. Titles + indices are never modified. const canonicalEntriesForPolish = parseBracketedTranscript(txResult.text || ""); const { planWindowsByDuration } = await import("../chunked-analyze.js"); const windowsForPolish = planWindowsByDuration({ totalAudioSec: audio.seconds || 0, bodySeconds: bodyMin * 60, overlapSeconds: overlapMin * 60, cutoffSeconds: cutoffMin * 60, }); const polishedSections = await runSummaryPolish({ sections: polishableSections, canonicalEntries: canonicalEntriesForPolish, windows: windowsForPolish, transcriptSegments: txResult.segments, speakerNames, speakerStats: txResult.speakers, promptOverride: cfg.relay_polish_summary_rewrite_prompt || cfg.relay_polish_summary_rewrite_prompt_default || "", backend: analyzeBackend, concurrency: anConcurrency, pipelineBackend: chosenAnalyzeBackend, jobId: summaryJobId, batchId: null, mediaUrl, installId, licenseFingerprint: licenseFp, source: "summarize-url", computeCostDetails, }); // Overwrite analyzeResult.text with the polished sections // so downstream code paths (saveJobOutput, markComplete) // see the polished version. analyzeResult = { ...analyzeResult, text: JSON.stringify({ sections: polishedSections }), }; const polishMs = Date.now() - polishStart; console.log( `[summarize-url ${job.id.slice(0, 8)}] post-cluster polish done in ${(polishMs / 1000).toFixed(1)}s — ${Object.values(speakerNames || {}).filter(Boolean).length}/${detectedSpeakerCount} speakers named` ); } catch (polishErr) { // Polish failure should never kill the request — keep the // unpolished output. Operator can see what went wrong in // the audit log; the user gets a working result either way. console.warn( `[summarize-url ${job.id.slice(0, 8)}] polish pass failed (keeping unpolished output): ${polishErr?.message || polishErr}` ); speakerNames = null; } } else if (polishEnabled) { // Diagnostic — surface why we skipped so operator can see // it without grep-ing the source. const reason = !Array.isArray(txResult?.segments) ? "no transcript segments" : detectedSpeakerCount < 2 ? `only ${detectedSpeakerCount} speaker(s) detected` : !polishableSections ? "no parseable sections" : "no sections to polish"; console.log( `[summarize-url ${job.id.slice(0, 8)}] polish skipped — ${reason}` ); } // ── Commit credit + finalize ──────────────────────────────── // Credit policy: charged ONLY when the full pipeline completes // cleanly. "Cleanly" means EVERY analyze window succeeded — // any per-window failure voids the charge even if the stitched // output is still usable. Rationale: a partial result is // surfacing degraded quality to the user; charging full price // for that would erode trust. The user keeps the partial // output (saved to disk + returned in the result envelope), // the operator eats the cost of the compute they paid for on // the windows that did succeed. const anyWindowFailed = (analyzeResult?.attempts?.failed ?? 0) > 0; let creditCharged = 0; if (anyWindowFailed) { // Don't charge. If a previous endpoint (legacy two-call path) // pre-charged on the same X-Recap-Job-Id, refund it. if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId }); console.log( `[summarize-url ${job.id.slice(0, 8)}] partial analyze (${analyzeResult.attempts.failed}/${analyzeResult.attempts.windows} windows failed) — credit NOT charged` ); } else if (!reusedSummaryJob) { await commitCredit({ creditKey, installId, license, backend: chosenBackend, tier }); await markJobCharged({ creditKey, installId, license, jobId: summaryJobId, backend: chosenBackend, tier }); creditCharged = 1; } // Phase 1D: build the speaker-tagged transcript segments // ONCE so both the operator-output store AND the Recap-facing // markComplete envelope share the same payload. Each segment // carries `start`, `end`, `text`, and (when diarization ran) // `speaker` + `speaker_confidence`. Null when diarization was // off and txResult.segments isn't an array. const transcriptSegments = Array.isArray(txResult?.segments) ? txResult.segments.map((s) => ({ start: s.start || 0, end: s.end || 0, text: s.text || "", speaker: s.speaker || null, speaker_confidence: s.speaker_confidence ?? null, // Phase 2 — set when the post-cluster suppression pass // reassigned this segment's source cluster to an anchor // (best-guess attribution). Recap chip will show "?". speaker_uncertain: !!s.speaker_uncertain, })) : null; // Save the full transcript+analysis to disk for the dashboard // when the operator has opted in via relay_save_user_outputs. if (cfg.relay_save_user_outputs) { await saveJobOutput(summaryJobId || job.id, { batch_id: null, source: "summarize-url", transcript: txResult?.text || "", analysis: tryParseJson(analyzeResult.text), analysis_raw_text: analyzeResult.text || null, // Persist diarization output too so it shows up in the // operator dashboard's per-job viewer. Both keys are null // when diarization was off. speakers: txResult?.speakers || null, // Phase 2 — inferred speaker names from the post-cluster // polish pass. Null if polish was skipped or all names // came back null. Recap renders these in place of // "Speaker A" in the legend. speaker_names: speakerNames || null, diarization: txResult?.diarization || null, transcript_segments: transcriptSegments, meta: { title: title || null, media_url: mediaUrl, audio_seconds: audio.seconds || null, audio_bytes: audio.bytes, captions_mode: null, transcribe_backend: chosenBackend, transcribe_model: txResult?.model || null, analyze_backend: chosenAnalyzeBackend, analyze_model: analyzeResult.model || null, }, }); } const wallTimeMs = Date.now() - workerT0; markComplete(job.id, { result: { transcript: txResult.text || "", analysis: tryParseJson(analyzeResult.text), analysis_raw_text: analyzeResult.text || "", // Echo back the resolved title (either operator-supplied via // the kickoff body or extracted by yt-dlp during download). // Recap-app's relay-mode branch was previously saving every // submission as "Untitled" because the title only existed // inside the relay's worker — this envelope field is how // we hand it back across the SSE done event. title: title || null, transcribe_model: txResult.model || null, analyze_model: analyzeResult.model || null, audio_seconds: audio.seconds || null, audio_bytes: audio.bytes || null, chunk_count: txResult.chunk_count ?? null, analyze_windows: analyzeResult.attempts?.windows ?? null, analyze_windows_failed: analyzeResult.attempts?.failed ?? null, wall_time_ms: wallTimeMs, // Phase 1D fields (null when diarization didn't run): speakers: txResult?.speakers || null, transcript_segments: transcriptSegments, // Phase 2 — speaker name map from the post-cluster polish // pass. Null when polish was skipped or all names came // back null. Recap merges this into the speakers legend // (showing "Matt Hill · 24:42" instead of "Speaker A"). speaker_names: speakerNames || null, }, credit_charged: creditCharged, tier, }); console.log( `[summarize-url ${job.id.slice(0, 8)}] complete in ${(wallTimeMs / 1000).toFixed(1)}s (TX ${chosenBackend}, AN ${chosenAnalyzeBackend})` ); } finally { // Release the hardware FIFO slot so the next queued job can // start. Runs on success path, on every early-return inside // the wrapped block, and on uncaught exceptions (in which // case the throw propagates to the .catch below after this // finally executes). releaseHardwareSlot is null when the // job was Gemini-only and never acquired a slot — no-op. if (releaseHardwareSlot) releaseHardwareSlot(); } })().catch((err) => { markFailed(job.id, "worker_crashed: " + (err?.message || String(err))); console.error(`[summarize-url ${job.id.slice(0, 8)}] worker crashed:`, err); }); const body = await envelope({ result: { job_id: job.id, status: "queued", kind: "summarize-url", }, creditKey, installId, license, tier, }); res.json(body); }); // GET /relay/summarize-url/:jobId/events — SSE stream of progress // and per-window events. Replays the existing event log on connect // (so a client that connects 2s after the kickoff still sees the // download + transcribe events), then pushes live events as they // arrive until the job reaches a terminal state. router.get("/summarize-url/:jobId/events", async (req, res) => { let identity; try { identity = await resolveIdentity(req); } catch { return res.status(401).json({ error: "unauthorized" }); } const ownerKey = identity.creditKey; if (!ownerKey) { return res.status(400).json({ error: "missing identity" }); } const job = getJob(req.params.jobId); if (!job) return res.status(404).json({ error: "job not found or expired" }); // New jobs carry metadata.owner = creditKey; older jobs only carry // install_id. Authorize by whichever the job has — and never leak the // existence of another identity's job. const allowed = job.metadata?.owner ? job.metadata.owner === ownerKey : identity.installId && job.install_id === identity.installId; if (!allowed) { return res.status(404).json({ error: "job not found or expired" }); } // SSE headers. res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache, no-transform"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); // disable nginx buffering res.flushHeaders?.(); const send = (event) => { try { res.write(`event: ${event.type}\n`); res.write(`data: ${JSON.stringify(event.data)}\n`); res.write(`id: ${event.ts}\n\n`); } catch (err) { // Connection closed mid-write — caller's cleanup handles it. } }; // Replay history. The client may have missed events that fired // between the POST returning and the GET arriving. for (const ev of job.events) { send(ev); } // If the job is already terminal, close immediately. if (job.status === "complete" || job.status === "failed") { res.end(); return; } // Subscribe to live events. Returned function MUST be called on // any disconnect path or the subscriber leaks. const unsubscribe = subscribeToJob(job.id, send); // Heartbeat every 25s. Many proxies (nginx default 60s, others // ~30s) close idle SSE connections — a periodic comment frame // keeps the conn alive between sparse events without polluting // the event stream. const heartbeat = setInterval(() => { try { res.write(": ping\n\n"); } catch {} }, 25_000); const cleanup = () => { clearInterval(heartbeat); if (unsubscribe) unsubscribe(); }; req.on("close", cleanup); req.on("aborted", cleanup); res.on("close", cleanup); }); return router; } function tryParseJson(text) { if (!text || typeof text !== "string") return null; let s = text.trim(); const cb = s.match(/```(?:json)?\s*([\s\S]*?)```/); if (cb) s = cb[1].trim(); try { return JSON.parse(s); } catch { return null; } }