Files
recap-relay/server/routes/summarize-url.js
T

1219 lines
53 KiB
JavaScript

// POST /relay/summarize-url — unified transcribe + analyze pipeline.
//
// Why this exists: the older /relay/transcribe-url + /relay/analyze
// pair required the Recap client to do its OWN analyze windowing
// (per-window POST to /relay/analyze, then client-side stitch).
// That round-trips the full transcript back to the client only for
// the client to slice it into 12 windows and ship those slices back
// to the relay, one per HTTP request. Bandwidth + latency tax on long
// content, and the chunking knobs in the operator's Settings tab
// only affected benchmarks — production traffic ignored them because
// Recap had its own hardcoded constants.
//
// This route does both steps server-side in one async job:
// 1. Download audio (yt-dlp / direct HTTP) — same code as transcribe-url
// 2. Transcribe with chunking (Gemini or Hardware) — same backends
// 3. Run chunked-analyze on the transcript using the operator's
// Settings-tab knobs (window body / overlap / concurrency)
// 4. Per-window section results stream to the client via SSE
// 5. Final transcript + stitched analysis returned in the
// complete job result
//
// Credit policy: charges ONE credit on full success (same as
// transcribe-url alone — Recap's billing model treats one summarize
// as one credit, regardless of which steps it touches). If transcribe
// fails before analyze starts, no credit is charged. If transcribe
// succeeds but ALL analyze windows fail, the credit IS charged (the
// expensive part — transcribe — completed, and partial-window
// failures are tolerated upstream of this).
//
// Endpoints exposed:
// POST /relay/summarize-url → kick off, returns job_id
// GET /relay/summarize-url/:jobId/events → SSE stream of progress + window events
// GET /relay/jobs/:id (existing) → poll-based fallback
import express from "express";
import fs from "fs/promises";
import os from "os";
import path from "path";
import { getConfigSnapshot } from "../config.js";
import { createGeminiBackend } from "../backends/gemini.js";
import { createHardwareBackend } from "../backends/hardware.js";
import { resolveHardwareConfig } from "../hardware-config.js";
import { recordCall } from "../audit-log.js";
import { calcGeminiCost } from "../pricing.js";
import { getAudioDurationSeconds } from "../audio-meta.js";
import {
createJob,
getJob,
markRunning,
setProgress,
markComplete,
markFailed,
appendEvent,
subscribeToJob,
} from "../jobs.js";
import { resolveIdentity, identityTier } from "../identity.js";
import {
getOrCreateRow,
planBackend,
commitCredit,
licenseFingerprint,
} from "../credits.js";
import { lookupJob, markJobCharged, refundJob } from "../job-credits.js";
import { getTierQuotas } from "../config.js";
import { envelope, errorEnvelope } from "./envelope.js";
import {
runChunkedAnalysis,
runPipelinedAnalysis,
parseBracketedTranscript,
firstEntryAtOrAfter,
lastEntryBefore,
canonicalIndexForOffset,
stitchAnalysisResults,
} from "../chunked-analyze.js";
import { createChunkBuffer } from "../chunk-buffer.js";
import {
runNameInference,
runSummaryPolish,
} from "../post-cluster-polish.js";
import { acquireHardwareSlot } from "../hardware-queue.js";
import { saveJobOutput } from "../output-store.js";
import {
looksLikeYouTube,
downloadDirect,
downloadYouTube,
} from "./transcribe-url.js";
import { reportHealthEvent } from "../spark-control-events.js";
export function summarizeUrlRouter() {
const router = express.Router();
// POST /relay/summarize-url — kicks off the combined pipeline.
// Same request shape as /relay/transcribe-url (media_url + optional
// hints). Same auth (X-Recap-Install-Id header + license proof).
router.post("/summarize-url", express.json({ limit: "1mb" }), async (req, res) => {
const summaryJobId = req.header("X-Recap-Job-Id") || null;
let identity;
try {
identity = await resolveIdentity(req);
} catch (err) {
const e = await errorEnvelope({
error: err?.message || "auth_error",
statusHint: err?.status || 401,
});
return res.status(e.statusHint || 401).json(e.body);
}
if (identity.kind === "license" && !identity.installId) {
const e = await errorEnvelope({
error: "missing X-Recap-Install-Id header",
statusHint: 400,
});
return res.status(400).json(e.body);
}
const { creditKey, installId, license } = identity;
// `title` is `let` rather than `const` because the worker may
// backfill it from yt-dlp metadata after the download completes
// (when the client didn't pre-fetch the title — typical for the
// Recap-app relay-mode branch which submits the URL alone).
let title;
// channel/description/chapters are declared with `let` (not const)
// because the yt-dlp download step below may backfill any of them
// when the client sent empty values — see the "Fall back to yt-dlp-
// extracted metadata" block after downloadYouTube returns.
let channel, description, chapters;
const {
media_url: mediaUrl,
type,
mime_type: bodyMime,
title: bodyTitle,
channel: bodyChannel,
description: bodyDescription,
chapters: bodyChapters,
} = req.body || {};
channel = bodyChannel;
description = bodyDescription;
chapters = bodyChapters;
// Treat empty strings and the literal sentinel "Untitled" as
// "no title supplied" so the yt-dlp metadata fallback below
// (`if (!title && audio.title) title = audio.title`) fires.
// Older Recap clients (< 0.2.71) pass "Untitled" verbatim when
// the operator hasn't pre-fetched metadata — without this
// normalization the relay would echo "Untitled" back as if it
// were a real title, and the resulting library entry would
// stay "Untitled" forever despite yt-dlp having the real one.
const trimmedBodyTitle = typeof bodyTitle === "string" ? bodyTitle.trim() : bodyTitle;
title = (trimmedBodyTitle === "" || trimmedBodyTitle === "Untitled")
? null
: trimmedBodyTitle;
if (!mediaUrl || typeof mediaUrl !== "string") {
const e = await errorEnvelope({
error: "missing or non-string body.media_url",
creditKey,
installId,
statusHint: 400,
});
return res.status(400).json(e.body);
}
const row = await getOrCreateRow({ creditKey, installId, license });
const tier = identityTier(identity, row);
row.tier_snapshot = tier;
const licenseFp = identity.kind === "cloud" ? null : licenseFingerprint(license);
const auditInstall = installId || identity.userId || null;
const reusedSummaryJob = !!lookupJob({ creditKey, installId, license, jobId: summaryJobId });
const cfgPlan = await getConfigSnapshot();
const hw = await resolveHardwareConfig(cfgPlan);
// Operator-only diagnostic. We do NOT 503 the request here on
// blocked_reason — that pre-empts the backend-preference choice
// and would return a hardware-down error even when Gemini was
// selected as the routing preference (it'd also leak the
// operator-internal Spark Control / parakeet wording to the
// client). Instead: log the detail for the operator's eyes,
// let `hasHardware = !!hw.transcribe.url` go to false, and let
// planBackend route to Gemini under the normal preference
// rules. If the operator chose hardware_only mode, planBackend
// refuses with the generic `hardware_only_not_configured`
// reason — which is client-safe.
if (hw.transcribe.blocked_reason) {
console.warn(
`[summarize-url] hardware transcribe currently blocked (planBackend will route to Gemini if available): ${hw.transcribe.blocked_reason}`,
);
}
if (hw.analyze.blocked_reason) {
console.warn(
`[summarize-url] hardware analyze currently blocked (planBackend will route to Gemini if available): ${hw.analyze.blocked_reason}`,
);
}
const hasHardware = !!hw.transcribe.url;
const quota = await getTierQuotas();
const preference =
cfgPlan.relay_transcribe_backend_preference || "gemini_first";
const plan = planBackend(row, quota, { hasHardware, preference });
if (!plan.allowed) {
await recordCall({
install_id: auditInstall,
license_fingerprint: licenseFp,
tier,
pipeline: "transcribe",
backend: null,
model: null,
status: "refused",
credit_charged: 0,
duration_ms: 0,
cost_usd: 0,
job_id: summaryJobId,
media_url: mediaUrl || null,
title: title || null,
error: plan.reason,
});
const e = await errorEnvelope({
error: plan.reason,
installId,
license,
tier,
statusHint: 402,
});
return res.status(402).json(e.body);
}
const chosenBackend = plan.backend;
// The analyze backend follows the operator's analyze-side
// preference, NOT the transcribe-side preference. They're set
// independently in StartOS so the operator can route, e.g.,
// transcribe-to-Gemini + analyze-to-hardware on tight Gemini
// budgets. planBackend evaluates the same quota/preference rules
// for analyze and returns a separate decision.
const analyzePreference =
cfgPlan.relay_analyze_backend_preference || "gemini_first";
const hasAnalyzeHardware = !!hw.analyze.url;
const analyzePlan = planBackend(row, quota, {
hasHardware: hasAnalyzeHardware,
preference: analyzePreference,
});
if (!analyzePlan.allowed) {
const e = await errorEnvelope({
error: analyzePlan.reason,
installId,
license,
tier,
statusHint: 402,
});
return res.status(402).json(e.body);
}
const chosenAnalyzeBackend = analyzePlan.backend;
const job = createJob({
kind: "summarize-url",
installId: auditInstall,
metadata: {
owner: creditKey, // authorizes the SSE /events stream (per-identity)
media_url: mediaUrl,
transcribe_backend: chosenBackend,
analyze_backend: chosenAnalyzeBackend,
summary_job_id: summaryJobId,
},
});
// Background worker — same try/catch discipline as transcribe-url.
// All progress + per-window events go through appendEvent(job.id, ...)
// so any SSE subscriber gets them in real time.
(async () => {
const workerT0 = Date.now();
markRunning(job.id);
appendEvent(job.id, "progress", { message: "downloading media…" });
setProgress(job.id, "downloading media…");
console.log(
`[summarize-url ${job.id.slice(0, 8)}] downloading media from ${(mediaUrl || "(no url)").slice(0, 120)}`
);
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "relay-sum-"));
const isYT = type === "youtube" || (!type && looksLikeYouTube(mediaUrl));
let audio;
let downloadMs = 0;
try {
const dlStart = Date.now();
audio = isYT
? await downloadYouTube(mediaUrl, tmpDir)
: await downloadDirect(mediaUrl, tmpDir);
downloadMs = Date.now() - dlStart;
audio.seconds = await getAudioDurationSeconds(audio.filePath);
console.log(
`[summarize-url ${job.id.slice(0, 8)}] download done in ${(downloadMs / 1000).toFixed(1)}s — ${Math.round((audio.seconds || 0) / 60)} min audio`
);
// Fall back to the yt-dlp-extracted title when the client
// didn't pass one. Recap-app's relay-mode branch doesn't
// pre-fetch YouTube metadata, so without this the Jobs table
// shows "Untitled" for every Recap submission.
if (!title && audio.title) {
title = audio.title;
}
// Same fallback pattern for channel / description / chapters.
// These power speaker-identification in the transcribe prompt
// (the model uses them to assign names like "Brandon Karpeles:"
// and "Matt Hill:" to dialogue turns). Recap-app's direct-to-
// Gemini path always fetches these via fetchYouTubeMetadata
// before transcribing; the relay path either gets them from
// the Recap client (if Recap fetched them and passed them
// through) OR — when the client sent empty fields — falls back
// here to whatever yt-dlp extracted during download. Defense-in-
// depth: works for ANY client that hits /relay/summarize-url,
// not just Recap, and survives a client upgrade gap where an
// older Recap is still sending empty channelHint/descriptionHint.
if (!channel && audio.channel) {
channel = audio.channel;
}
if (!description && audio.description) {
description = audio.description;
}
if ((!Array.isArray(chapters) || chapters.length === 0) && Array.isArray(audio.chapters) && audio.chapters.length > 0) {
chapters = audio.chapters;
}
appendEvent(job.id, "progress", {
message: `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`,
});
setProgress(job.id, `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`);
} catch (err) {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
const msg = (err?.message || String(err)).slice(0, 300);
await recordCall({
install_id: auditInstall,
license_fingerprint: licenseFp,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model: null,
status: "error",
credit_charged: 0,
duration_ms: Date.now() - workerT0,
download_ms: Date.now() - workerT0,
audio_seconds: null,
cost_usd: 0,
job_id: summaryJobId,
media_url: mediaUrl || null,
title: title || null,
error: "download_failed: " + msg,
});
markFailed(job.id, "download_failed: " + msg);
return;
}
// ── Hardware FIFO queue acquire ─────────────────────────────
// Hardware jobs serialize through a single-slot semaphore so
// the operator's GPU box doesn't get N jobs hammering it at
// once. Gemini-backend jobs bypass — Google's API handles
// concurrency at scale. The acquire awaits if there's an
// active hardware job ahead; we emit a "queued" SSE event so
// Recap can show "Queued — N jobs ahead" while we wait. See
// server/hardware-queue.js.
const usesHardwareForAnything =
chosenBackend === "hardware" || chosenAnalyzeBackend === "hardware";
let releaseHardwareSlot = null;
if (usesHardwareForAnything) {
releaseHardwareSlot = await acquireHardwareSlot({
jobId: job.id,
onWait: ({ position, activeJobId }) => {
// Tell the SSE client we're waiting so the UI can show
// a queue position instead of a stalled "Transcribing…"
// state. Best-effort — appendEvent is fire-and-forget.
try {
appendEvent(job.id, "queued", {
position,
activeJobId: activeJobId || null,
});
setProgress(
job.id,
`queued — ${position} job(s) ahead on operator hardware`
);
} catch {}
console.log(
`[summarize-url ${job.id.slice(0, 8)}] queued for hardware — position ${position}`
);
},
});
}
// Wrap from here to end-of-IIFE in try/finally so the hardware
// slot ALWAYS releases — on success, on early-return failure,
// and on uncaught exceptions. Finally executes before the
// throw propagates so the next queued job unblocks promptly.
try {
// ── Transcribe ──────────────────────────────────────────────
// Pipelined-analyze design (v0.2.89): when the hardware backend
// is transcribing, fire each analyze window's call AS SOON AS
// its required transcribe chunks have completed — in parallel
// with later chunks still being transcribed. The total wall
// time savings are modest (~10s for a 94-min video), but the
// user-perceived "first topic visible" time drops from
// ~T=160s to ~T=80s because window 1's sections render while
// chunks 5-21 are still in-flight. See server/chunk-buffer.js
// and runPipelinedAnalysis in chunked-analyze.js for the
// mechanics. Gemini transcribe doesn't expose per-chunk
// callbacks (single big call) so its path stays sequential.
const cfg = await getConfigSnapshot();
let txResult;
const txPhaseStart = Date.now();
// Build the analyze backend UP FRONT — we need it before
// transcribe starts in pipelined mode. Sequential mode would
// also work with it built here, just doesn't strictly need to.
const anStart = Date.now();
let analyzeBackend = null;
try {
if (chosenAnalyzeBackend === "gemini") {
analyzeBackend = createGeminiBackend({
apiKey: cfg.relay_gemini_api_key,
transcriptionModel: cfg.relay_gemini_transcription_model,
analysisModel: cfg.relay_gemini_analysis_model,
txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60,
txConcurrency: cfg.relay_gemini_tx_concurrency || 12,
txMaxOutputTokens: cfg.relay_gemini_tx_max_output_tokens || 65536,
anMaxOutputTokens: cfg.relay_gemini_an_max_output_tokens || 8192,
});
} else {
if (!hw.analyze.url) {
throw new Error("hardware analyze URL not configured");
}
analyzeBackend = createHardwareBackend({
parakeetBaseURL: hw.transcribe.url || "",
gemmaBaseURL: hw.analyze.url,
sparkControlBaseURL: hw.sparkBase || "",
parakeetModel: hw.transcribe.model || "",
gemmaModel: hw.analyze.model || "",
txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60,
txChunkOverlapSeconds:
cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30,
diarizationEnabled: !!cfg.relay_hardware_diarization_enabled,
clusterThresholdPct:
cfg.relay_hardware_voice_clustering_threshold ?? 70,
anchorMinSpeakingSec:
cfg.relay_hardware_anchor_min_speaking_sec ?? 30,
smallClusterMaxSpeakingSec:
cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15,
uncertainMarginPct:
cfg.relay_hardware_uncertain_margin_pct ?? 10,
txConcurrency: cfg.relay_hardware_tx_concurrency || 4,
anMaxTokens: cfg.relay_hardware_an_max_tokens || 16000,
});
}
} catch (err) {
const msg = (err?.message || String(err)).slice(0, 400);
await recordCall({
install_id: auditInstall,
license_fingerprint: licenseFp,
tier,
pipeline: "analyze",
backend: chosenAnalyzeBackend,
model: null,
status: "error",
duration_ms: 0,
audio_seconds: 0,
cost_usd: 0,
job_id: summaryJobId,
media_url: mediaUrl || null,
title: title || null,
error: "analyze_backend_init_failed: " + msg,
window_idx: 0,
window_count: 1,
});
markFailed(job.id, "analyze_init_failed: " + msg);
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
return;
}
// Analyze tunables (used by both pipelined + sequential paths).
const bodyMin = chosenAnalyzeBackend === "gemini"
? (cfg.relay_gemini_analyze_window_minutes || 18)
: (cfg.relay_hardware_analyze_window_minutes || 18);
const overlapMin = chosenAnalyzeBackend === "gemini"
? (cfg.relay_gemini_analyze_overlap_minutes || 2)
: (cfg.relay_hardware_analyze_overlap_minutes || 2);
const anConcurrency = chosenAnalyzeBackend === "gemini"
? (cfg.relay_gemini_analyze_concurrency || 12)
: (cfg.relay_hardware_analyze_concurrency || 8);
const cutoffMin = cfg.relay_analyze_cutoff_minutes || 25;
const targetTotalsByBucket = {
under_30: cfg.relay_analyze_total_sections_under_30,
"30_60": cfg.relay_analyze_total_sections_30_60,
"60_90": cfg.relay_analyze_total_sections_60_90,
"90_120": cfg.relay_analyze_total_sections_90_120,
"120_150": cfg.relay_analyze_total_sections_120_150,
"150_180": cfg.relay_analyze_total_sections_150_180,
over_180: cfg.relay_analyze_total_sections_over_180,
};
const analyzePromptOverride =
cfg.relay_analyze_prompt || cfg.relay_analyze_prompt_default || "";
const computeCostDetails = (model, usage) =>
chosenAnalyzeBackend === "gemini" && usage
? calcGeminiCost(model, usage)
: { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 };
// Pipelined mode requires hardware transcribe (per-chunk
// onChunkComplete callback) + a known audio duration. When both
// hold, we start the analyze workers BEFORE the transcribe
// promise so they're already waiting on chunkBuffer when the
// first chunks land.
const usePipelining =
chosenBackend === "hardware" && (audio.seconds || 0) > 0;
const chunkBuffer = usePipelining ? createChunkBuffer() : null;
const onWindowCompleteHandler = (cb) => {
appendEvent(job.id, "window_complete", {
windowIdx: cb.windowIdx,
totalWindows: cb.totalWindows,
ownedSections: cb.ownedSections,
// Pipelined mode: include this window's local entries so
// the SDK / recap-app handler can render the partial
// sections WITHOUT waiting for transcribe_complete to
// populate streamedRelayEntries. The SDK falls back to
// streamedRelayEntries when windowEntries is absent
// (sequential mode).
windowEntries: cb.windowEntries || undefined,
windowBodySeconds: cb.windowBodySeconds,
model: cb.model,
durationMs: cb.durationMs,
});
setProgress(job.id, `analyze window ${cb.windowIdx + 1}/${cb.totalWindows} done`);
const sectionCount = Array.isArray(cb.ownedSections)
? cb.ownedSections.length
: 0;
console.log(
`[summarize-url ${job.id.slice(0, 8)}] analyze window ${cb.windowIdx + 1}/${cb.totalWindows} done in ${((cb.durationMs || 0) / 1000).toFixed(1)}s (${sectionCount} sections, model=${cb.model || "?"})`
);
};
// Kick off the pipelined-analyze promise BEFORE we await
// transcribe. Workers block on chunkBuffer.waitForTime() until
// their windows' required chunks land. If transcribe throws,
// we call chunkBuffer.fail() in the catch to release the
// pending waiters.
let pipelinedAnalyzePromise = null;
if (usePipelining) {
pipelinedAnalyzePromise = runPipelinedAnalysis({
audioDurationSec: audio.seconds || 0,
waitForTime: (sec) => chunkBuffer.waitForTime(sec),
getReadySegments: (s, e) => chunkBuffer.getSegments(s, e),
bodySeconds: bodyMin * 60,
overlapSeconds: overlapMin * 60,
cutoffSeconds: cutoffMin * 60,
concurrency: anConcurrency,
backend: analyzeBackend,
pipelineBackend: chosenAnalyzeBackend,
jobId: summaryJobId,
batchId: null,
mediaUrl,
title: title || null,
installId,
licenseFingerprint: licenseFp,
source: "summarize-url",
computeCostDetails,
analyzePromptOverride,
targetTotalsByBucket,
onWindowComplete: onWindowCompleteHandler,
}).catch((err) => ({ __error: err }));
}
try {
const audioBuf = await fs.readFile(audio.filePath);
const mimeType = bodyMime || audio.mimeType;
if (chosenBackend === "gemini") {
const backend = createGeminiBackend({
apiKey: cfg.relay_gemini_api_key,
transcriptionModel: cfg.relay_gemini_transcription_model,
analysisModel: cfg.relay_gemini_analysis_model,
txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60,
txConcurrency: cfg.relay_gemini_tx_concurrency || 12,
// Three-layer prompt resolution: per-session override
// wins; operator-promoted default is next; gemini.js's
// hardcoded DEFAULT_TRANSCRIBE_PROMPT_BODY is the
// factory fallback. Empty string here means "fall back
// to the code-side default" inside the backend.
transcribePromptOverride:
cfg.relay_transcribe_prompt ||
cfg.relay_transcribe_prompt_default ||
"",
txMaxOutputTokens: cfg.relay_gemini_tx_max_output_tokens || 65536,
anMaxOutputTokens: cfg.relay_gemini_an_max_output_tokens || 8192,
});
txResult = await backend.transcribeAudio({
audio: audioBuf,
mimeType,
title: title || "",
channel: channel || "",
description: description || "",
chapters: Array.isArray(chapters) ? chapters : [],
offsetSeconds: 0,
});
} else {
const backend = createHardwareBackend({
parakeetBaseURL: hw.transcribe.url || "",
gemmaBaseURL: hw.analyze.url || "",
sparkControlBaseURL: hw.sparkBase || "",
parakeetModel: hw.transcribe.model || "",
gemmaModel: hw.analyze.model || "",
txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60,
txChunkOverlapSeconds:
cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30,
diarizationEnabled: !!cfg.relay_hardware_diarization_enabled,
clusterThresholdPct:
cfg.relay_hardware_voice_clustering_threshold ?? 70,
anchorMinSpeakingSec:
cfg.relay_hardware_anchor_min_speaking_sec ?? 30,
smallClusterMaxSpeakingSec:
cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15,
uncertainMarginPct:
cfg.relay_hardware_uncertain_margin_pct ?? 10,
txConcurrency: cfg.relay_hardware_tx_concurrency || 4,
anMaxTokens: cfg.relay_hardware_an_max_tokens || 16000,
// Pipelined-mode hook: feed each completed chunk into
// the shared buffer so the analyze workers can unblock.
onChunkComplete: chunkBuffer
? (cd) => chunkBuffer.add(cd)
: null,
});
txResult = await backend.transcribeAudio({
audio: audioBuf,
mimeType,
offsetSeconds: 0,
});
}
} catch (err) {
if (chunkBuffer) chunkBuffer.fail(err);
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId });
const msg = (err?.message || String(err)).slice(0, 400);
if (chosenBackend === "hardware") {
reportHealthEvent({
service: "parakeet",
ok: false,
error: msg.slice(0, 280),
ms: Date.now() - workerT0,
});
}
await recordCall({
install_id: auditInstall,
license_fingerprint: licenseFp,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model: chosenBackend === "gemini"
? cfg.relay_gemini_transcription_model
: hw.transcribe.model || "(auto)",
status: "error",
credit_charged: 0,
duration_ms: Date.now() - txPhaseStart,
download_ms: downloadMs,
audio_seconds: audio?.seconds || null,
audio_bytes: audio?.bytes || null,
cost_usd: 0,
job_id: summaryJobId,
media_url: mediaUrl || null,
title: title || null,
error: msg,
});
markFailed(job.id, msg);
return;
} finally {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
}
// Audit the transcribe row (success path).
const txCostDetails =
chosenBackend === "gemini" && txResult.usage
? calcGeminiCost(txResult.model, txResult.usage)
: { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 };
// Detect output-token truncation. The backend returns a
// truncated_chunks array when any chunk's last emitted
// timestamp falls short of the chunk's expected duration —
// a silent failure mode where the model hit its output cap
// mid-transcript. We mark the audit row status="partial"
// and stamp an error string so the Jobs table doesn't
// mislead the operator into thinking the run succeeded.
const truncatedChunks = Array.isArray(txResult?.truncated_chunks)
? txResult.truncated_chunks
: [];
const wasTruncated = truncatedChunks.length > 0;
const truncationError = wasTruncated
? `transcribe: ${truncatedChunks.length} chunk(s) truncated — missing ~${truncatedChunks.reduce((s, c) => s + (c.missingSec || 0), 0)}s of speech (model: ${txResult.model || "unknown"}). Likely hit maxOutputTokens.`
: null;
await recordCall({
install_id: auditInstall,
license_fingerprint: licenseFp,
tier,
pipeline: "transcribe",
backend: chosenBackend,
model: txResult?.model || null,
status: wasTruncated ? "partial" : "success",
credit_charged: 0, // credit committed AFTER analyze succeeds too
// duration_ms = transcribe phase ONLY (txPhaseStart was
// stamped after the download completed). Previously this used
// Date.now() - workerT0 which included the download time too,
// so the Jobs table's "TX wall time" column appeared inflated
// by ~50s on long YouTube fetches. download_ms below carries
// the download phase as a separate field so the operator can
// see both numbers on the same row.
duration_ms: Date.now() - txPhaseStart,
download_ms: downloadMs,
audio_bytes: audio.bytes,
audio_seconds: audio.seconds || null,
job_id: summaryJobId,
attempts: txResult?.attempts || null,
media_url: mediaUrl || null,
title: title || null,
chunk_count: txResult?.chunk_count ?? null,
chunk_durations_ms: txResult?.chunk_durations_ms || null,
truncated_chunks: wasTruncated ? truncatedChunks : null,
error: truncationError,
...txCostDetails,
});
// Emit a transcribe_complete event so the SSE client can show
// the full transcript while analyze is still in flight (useful
// for a "transcript preview" UI affordance).
appendEvent(job.id, "transcribe_complete", {
transcript: txResult.text || "",
model: txResult.model || null,
chunk_count: txResult.chunk_count ?? null,
audio_seconds: audio.seconds || null,
audio_bytes: audio.bytes || null,
duration_ms: Date.now() - workerT0,
});
setProgress(job.id, "analyzing topics…");
console.log(
`[summarize-url ${job.id.slice(0, 8)}] transcribe done in ${((Date.now() - txPhaseStart) / 1000).toFixed(1)}s — starting analyze`
);
// ── Analyze ─────────────────────────────────────────────────
// Pipelined mode: await the promise we kicked off BEFORE
// transcribe; remap window-local section indices to global
// canonical indices; stitch.
// Sequential mode: call runChunkedAnalysis on the full
// transcript as today.
let analyzeResult;
try {
if (pipelinedAnalyzePromise) {
const pipelinedRaw = await pipelinedAnalyzePromise;
if (pipelinedRaw && pipelinedRaw.__error) {
throw pipelinedRaw.__error;
}
// Build canonical entries from the final stitched transcript.
// The pipelined windows were each analyzed against their own
// segment-only view; we now map their LOCAL section indices
// to GLOBAL canonical-entry indices (which is what the
// stitcher + downstream stitching code expects).
const canonicalEntries = parseBracketedTranscript(txResult.text || "");
const syntheticResults = (pipelinedRaw.windowResults || []).map((wr) => {
if (!wr || !wr.ok) return wr;
const w = wr.window;
const globalStartIdx = firstEntryAtOrAfter(canonicalEntries, w.startSec);
const globalBodyStartIdx = firstEntryAtOrAfter(canonicalEntries, w.bodyStartSec);
const globalEndIdx = lastEntryBefore(canonicalEntries, w.windowEndSec + 0.5);
// Edge case: this window's time range produced no canonical
// entries (audio gap, severe truncation). Mark as failed
// so the stitcher skips it.
if (
globalStartIdx >= canonicalEntries.length ||
globalEndIdx < globalStartIdx
) {
return {
window: { startIdx: 0, endIdx: -1, bodyStartIdx: 0 },
ok: false,
error: new Error("pipelined window had no canonical entries — audio gap"),
};
}
// Remap each section's window-LOCAL indices to GLOBAL
// canonical indices via time matching, then express as
// window-local for the stitcher's offset math (which
// adds window.startIdx to each section's startIndex).
const remapped = [];
for (const s of wr.sections || []) {
const localStartEntry = wr.windowEntries?.[s.startIndex];
const localEndEntry = wr.windowEntries?.[s.endIndex];
if (!localStartEntry || !localEndEntry) continue;
const globalStart = canonicalIndexForOffset(
canonicalEntries,
localStartEntry.offset || 0
);
const globalEnd = canonicalIndexForOffset(
canonicalEntries,
localEndEntry.offset || 0
);
if (globalStart < 0 || globalEnd < 0) continue;
remapped.push({
startIndex: globalStart - globalStartIdx,
endIndex: globalEnd - globalStartIdx,
title: s.title,
summary: s.summary,
});
}
return {
window: {
startIdx: globalStartIdx,
endIdx: globalEndIdx,
bodyStartIdx: globalBodyStartIdx,
},
ok: true,
sections: remapped,
model: wr.model,
};
});
const stitched = stitchAnalysisResults(syntheticResults);
analyzeResult = {
text: JSON.stringify({ sections: stitched }),
model: pipelinedRaw.dominantModel,
attempts: pipelinedRaw.attempts,
usage: null,
};
} else {
analyzeResult = await runChunkedAnalysis({
transcriptText: txResult.text || "",
backend: analyzeBackend,
pipelineBackend: chosenAnalyzeBackend,
jobId: summaryJobId,
batchId: null,
mediaUrl,
title: title || null,
installId,
licenseFingerprint: licenseFp,
source: "summarize-url",
computeCostDetails,
bodySeconds: bodyMin * 60,
overlapSeconds: overlapMin * 60,
concurrency: anConcurrency,
cutoffSeconds: cutoffMin * 60,
analyzePromptOverride,
totalAudioSec: audio.seconds || 0,
targetTotalsByBucket,
onWindowComplete: ({ windowIdx, totalWindows, ownedSections, windowBodySeconds, model, durationMs }) => {
appendEvent(job.id, "window_complete", {
windowIdx,
totalWindows,
ownedSections,
windowBodySeconds,
model,
durationMs,
});
setProgress(job.id, `analyze window ${windowIdx + 1}/${totalWindows} done`);
const sectionCount = Array.isArray(ownedSections)
? ownedSections.length
: (typeof ownedSections === "number" ? ownedSections : 0);
console.log(
`[summarize-url ${job.id.slice(0, 8)}] analyze window ${windowIdx + 1}/${totalWindows} done in ${(durationMs / 1000).toFixed(1)}s (${sectionCount} sections, model=${model || "?"})`
);
},
});
}
} catch (err) {
// All analyze windows failed (pipelined OR sequential).
// Refund credit, mark job failed.
if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId });
const msg = (err?.message || String(err)).slice(0, 400);
markFailed(job.id, "analyze_failed: " + msg);
return;
}
// ── Post-cluster polish (Phase 2) ───────────────────────────
// After diarization + clustering produced speaker IDs AND
// analyze produced topic sections, run a two-stage LLM pass
// that (1) infers real speaker names from the labeled
// transcript + episode metadata, and (2) rewrites section
// summaries to attribute statements to those speakers.
// Skipped when:
// - Operator disabled via Settings
// - Diarization didn't run (no speakers to attribute)
// - Fewer than 2 speakers detected (nothing to differentiate)
// - Analyze had no sections to polish
//
// Failure handling: both stages are best-effort. If either
// fails entirely, we keep the unpolished analyze output and
// null speaker names. The user still gets a working result.
let speakerNames = null;
const polishEnabled = cfg.relay_post_cluster_polish_enabled !== false;
const detectedSpeakerCount = Object.keys(txResult?.speakers || {}).length;
const parsedAnalysisForPolish = tryParseJson(analyzeResult.text);
const polishableSections = Array.isArray(parsedAnalysisForPolish?.sections)
? parsedAnalysisForPolish.sections
: null;
if (
polishEnabled &&
detectedSpeakerCount >= 2 &&
Array.isArray(txResult?.segments) &&
polishableSections &&
polishableSections.length > 0
) {
const polishStart = Date.now();
try {
// Stage 1 — global speaker name inference. Single call,
// returns map { Speaker_A: "Matt Hill", Speaker_B: null, ... }.
speakerNames = await runNameInference({
speakers: txResult.speakers,
transcriptSegments: txResult.segments,
channelHint: channel || "",
titleHint: title || "",
descriptionHint: description || "",
// Three-layer prompt resolution — per-session override
// wins, then operator-promoted default, then the
// hardcoded default inside post-cluster-polish.js.
promptOverride:
cfg.relay_polish_name_inference_prompt ||
cfg.relay_polish_name_inference_prompt_default ||
"",
backend: analyzeBackend,
pipelineBackend: chosenAnalyzeBackend,
jobId: summaryJobId,
batchId: null,
mediaUrl,
installId,
licenseFingerprint: licenseFp,
source: "summarize-url",
computeCostDetails,
});
// Stage 2 — per-window summary polish (parallel). Rewrites
// section summaries to attribute statements to speakers.
// Sections whose window's polish fails keep their
// original summary. Titles + indices are never modified.
const canonicalEntriesForPolish = parseBracketedTranscript(txResult.text || "");
const { planWindowsByDuration } = await import("../chunked-analyze.js");
const windowsForPolish = planWindowsByDuration({
totalAudioSec: audio.seconds || 0,
bodySeconds: bodyMin * 60,
overlapSeconds: overlapMin * 60,
cutoffSeconds: cutoffMin * 60,
});
const polishedSections = await runSummaryPolish({
sections: polishableSections,
canonicalEntries: canonicalEntriesForPolish,
windows: windowsForPolish,
transcriptSegments: txResult.segments,
speakerNames,
speakerStats: txResult.speakers,
promptOverride:
cfg.relay_polish_summary_rewrite_prompt ||
cfg.relay_polish_summary_rewrite_prompt_default ||
"",
backend: analyzeBackend,
concurrency: anConcurrency,
pipelineBackend: chosenAnalyzeBackend,
jobId: summaryJobId,
batchId: null,
mediaUrl,
installId,
licenseFingerprint: licenseFp,
source: "summarize-url",
computeCostDetails,
});
// Overwrite analyzeResult.text with the polished sections
// so downstream code paths (saveJobOutput, markComplete)
// see the polished version.
analyzeResult = {
...analyzeResult,
text: JSON.stringify({ sections: polishedSections }),
};
const polishMs = Date.now() - polishStart;
console.log(
`[summarize-url ${job.id.slice(0, 8)}] post-cluster polish done in ${(polishMs / 1000).toFixed(1)}s — ${Object.values(speakerNames || {}).filter(Boolean).length}/${detectedSpeakerCount} speakers named`
);
} catch (polishErr) {
// Polish failure should never kill the request — keep the
// unpolished output. Operator can see what went wrong in
// the audit log; the user gets a working result either way.
console.warn(
`[summarize-url ${job.id.slice(0, 8)}] polish pass failed (keeping unpolished output): ${polishErr?.message || polishErr}`
);
speakerNames = null;
}
} else if (polishEnabled) {
// Diagnostic — surface why we skipped so operator can see
// it without grep-ing the source.
const reason = !Array.isArray(txResult?.segments)
? "no transcript segments"
: detectedSpeakerCount < 2
? `only ${detectedSpeakerCount} speaker(s) detected`
: !polishableSections
? "no parseable sections"
: "no sections to polish";
console.log(
`[summarize-url ${job.id.slice(0, 8)}] polish skipped — ${reason}`
);
}
// ── Commit credit + finalize ────────────────────────────────
// Credit policy: charged ONLY when the full pipeline completes
// cleanly. "Cleanly" means EVERY analyze window succeeded —
// any per-window failure voids the charge even if the stitched
// output is still usable. Rationale: a partial result is
// surfacing degraded quality to the user; charging full price
// for that would erode trust. The user keeps the partial
// output (saved to disk + returned in the result envelope),
// the operator eats the cost of the compute they paid for on
// the windows that did succeed.
const anyWindowFailed = (analyzeResult?.attempts?.failed ?? 0) > 0;
let creditCharged = 0;
if (anyWindowFailed) {
// Don't charge. If a previous endpoint (legacy two-call path)
// pre-charged on the same X-Recap-Job-Id, refund it.
if (reusedSummaryJob) await refundJob({ creditKey, installId, license, jobId: summaryJobId });
console.log(
`[summarize-url ${job.id.slice(0, 8)}] partial analyze (${analyzeResult.attempts.failed}/${analyzeResult.attempts.windows} windows failed) — credit NOT charged`
);
} else if (!reusedSummaryJob) {
await commitCredit({ creditKey, installId, license, backend: chosenBackend, tier });
await markJobCharged({ creditKey, installId, license, jobId: summaryJobId, backend: chosenBackend, tier });
creditCharged = 1;
}
// Phase 1D: build the speaker-tagged transcript segments
// ONCE so both the operator-output store AND the Recap-facing
// markComplete envelope share the same payload. Each segment
// carries `start`, `end`, `text`, and (when diarization ran)
// `speaker` + `speaker_confidence`. Null when diarization was
// off and txResult.segments isn't an array.
const transcriptSegments = Array.isArray(txResult?.segments)
? txResult.segments.map((s) => ({
start: s.start || 0,
end: s.end || 0,
text: s.text || "",
speaker: s.speaker || null,
speaker_confidence: s.speaker_confidence ?? null,
// Phase 2 — set when the post-cluster suppression pass
// reassigned this segment's source cluster to an anchor
// (best-guess attribution). Recap chip will show "?".
speaker_uncertain: !!s.speaker_uncertain,
}))
: null;
// Save the full transcript+analysis to disk for the dashboard
// when the operator has opted in via relay_save_user_outputs.
if (cfg.relay_save_user_outputs) {
await saveJobOutput(summaryJobId || job.id, {
batch_id: null,
source: "summarize-url",
transcript: txResult?.text || "",
analysis: tryParseJson(analyzeResult.text),
analysis_raw_text: analyzeResult.text || null,
// Persist diarization output too so it shows up in the
// operator dashboard's per-job viewer. Both keys are null
// when diarization was off.
speakers: txResult?.speakers || null,
// Phase 2 — inferred speaker names from the post-cluster
// polish pass. Null if polish was skipped or all names
// came back null. Recap renders these in place of
// "Speaker A" in the legend.
speaker_names: speakerNames || null,
diarization: txResult?.diarization || null,
transcript_segments: transcriptSegments,
meta: {
title: title || null,
media_url: mediaUrl,
audio_seconds: audio.seconds || null,
audio_bytes: audio.bytes,
captions_mode: null,
transcribe_backend: chosenBackend,
transcribe_model: txResult?.model || null,
analyze_backend: chosenAnalyzeBackend,
analyze_model: analyzeResult.model || null,
},
});
}
const wallTimeMs = Date.now() - workerT0;
markComplete(job.id, {
result: {
transcript: txResult.text || "",
analysis: tryParseJson(analyzeResult.text),
analysis_raw_text: analyzeResult.text || "",
// Echo back the resolved title (either operator-supplied via
// the kickoff body or extracted by yt-dlp during download).
// Recap-app's relay-mode branch was previously saving every
// submission as "Untitled" because the title only existed
// inside the relay's worker — this envelope field is how
// we hand it back across the SSE done event.
title: title || null,
transcribe_model: txResult.model || null,
analyze_model: analyzeResult.model || null,
audio_seconds: audio.seconds || null,
audio_bytes: audio.bytes || null,
chunk_count: txResult.chunk_count ?? null,
analyze_windows: analyzeResult.attempts?.windows ?? null,
analyze_windows_failed: analyzeResult.attempts?.failed ?? null,
wall_time_ms: wallTimeMs,
// Phase 1D fields (null when diarization didn't run):
speakers: txResult?.speakers || null,
transcript_segments: transcriptSegments,
// Phase 2 — speaker name map from the post-cluster polish
// pass. Null when polish was skipped or all names came
// back null. Recap merges this into the speakers legend
// (showing "Matt Hill · 24:42" instead of "Speaker A").
speaker_names: speakerNames || null,
},
credit_charged: creditCharged,
tier,
});
console.log(
`[summarize-url ${job.id.slice(0, 8)}] complete in ${(wallTimeMs / 1000).toFixed(1)}s (TX ${chosenBackend}, AN ${chosenAnalyzeBackend})`
);
} finally {
// Release the hardware FIFO slot so the next queued job can
// start. Runs on success path, on every early-return inside
// the wrapped block, and on uncaught exceptions (in which
// case the throw propagates to the .catch below after this
// finally executes). releaseHardwareSlot is null when the
// job was Gemini-only and never acquired a slot — no-op.
if (releaseHardwareSlot) releaseHardwareSlot();
}
})().catch((err) => {
markFailed(job.id, "worker_crashed: " + (err?.message || String(err)));
console.error(`[summarize-url ${job.id.slice(0, 8)}] worker crashed:`, err);
});
const body = await envelope({
result: {
job_id: job.id,
status: "queued",
kind: "summarize-url",
},
creditKey,
installId,
license,
tier,
});
res.json(body);
});
// GET /relay/summarize-url/:jobId/events — SSE stream of progress
// and per-window events. Replays the existing event log on connect
// (so a client that connects 2s after the kickoff still sees the
// download + transcribe events), then pushes live events as they
// arrive until the job reaches a terminal state.
router.get("/summarize-url/:jobId/events", async (req, res) => {
let identity;
try {
identity = await resolveIdentity(req);
} catch {
return res.status(401).json({ error: "unauthorized" });
}
const ownerKey = identity.creditKey;
if (!ownerKey) {
return res.status(400).json({ error: "missing identity" });
}
const job = getJob(req.params.jobId);
if (!job) return res.status(404).json({ error: "job not found or expired" });
// New jobs carry metadata.owner = creditKey; older jobs only carry
// install_id. Authorize by whichever the job has — and never leak the
// existence of another identity's job.
const allowed = job.metadata?.owner
? job.metadata.owner === ownerKey
: identity.installId && job.install_id === identity.installId;
if (!allowed) {
return res.status(404).json({ error: "job not found or expired" });
}
// SSE headers.
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no"); // disable nginx buffering
res.flushHeaders?.();
const send = (event) => {
try {
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event.data)}\n`);
res.write(`id: ${event.ts}\n\n`);
} catch (err) {
// Connection closed mid-write — caller's cleanup handles it.
}
};
// Replay history. The client may have missed events that fired
// between the POST returning and the GET arriving.
for (const ev of job.events) {
send(ev);
}
// If the job is already terminal, close immediately.
if (job.status === "complete" || job.status === "failed") {
res.end();
return;
}
// Subscribe to live events. Returned function MUST be called on
// any disconnect path or the subscriber leaks.
const unsubscribe = subscribeToJob(job.id, send);
// Heartbeat every 25s. Many proxies (nginx default 60s, others
// ~30s) close idle SSE connections — a periodic comment frame
// keeps the conn alive between sparse events without polluting
// the event stream.
const heartbeat = setInterval(() => {
try {
res.write(": ping\n\n");
} catch {}
}, 25_000);
const cleanup = () => {
clearInterval(heartbeat);
if (unsubscribe) unsubscribe();
};
req.on("close", cleanup);
req.on("aborted", cleanup);
res.on("close", cleanup);
});
return router;
}
function tryParseJson(text) {
if (!text || typeof text !== "string") return null;
let s = text.trim();
const cb = s.match(/```(?:json)?\s*([\s\S]*?)```/);
if (cb) s = cb[1].trim();
try {
return JSON.parse(s);
} catch {
return null;
}
}