// Chunked topic-analysis: split a long transcript into overlapping // time-windowed slices, analyze each slice in parallel, stitch the // returned sections back into one coherent list. // // Why: a single-shot analyze call against a 2-hour transcript spends // most of its wall-time on prefill (typically 25K+ tokens). Splitting // into 18-min slices gives the model a much smaller prompt per call, // and firing the slices concurrently lets the backend (relay/vLLM or // Gemini) batch them. End-to-end wall-time drops from minutes to // tens of seconds for long content, with no quality regression as // long as the slice boundaries are chosen with overlap and the // stitcher trusts the second slice for the overlap region. // // Public entry point: runChunkedAnalysis(). import { buildAnalysisPrompt } from "./gemini-helpers.js"; // ── Tunables ──────────────────────────────────────────────────────────────── // Window body: the part of a chunk that "owns" its topic boundaries. // Overlap: a tail appended to each window so a topic spanning a // boundary still gets seen in full by at least one window. // Stride = body. Windows advance by `body` seconds; each window // covers `body + overlap` seconds of audio. const WINDOW_BODY_SECONDS = 18 * 60; // 18 min const WINDOW_OVERLAP_SECONDS = 2 * 60; // 2 min // Don't chunk below this duration. A single analyze call against // <25 min is fast on its own and avoids the stitching complexity // for the common short-content case. // Exported so the orchestrator can mirror the decision when picking // whether to coalesce: above this duration the chunker handles // granularity per-window, so the pre-chunk coalesce is unnecessary // and would hurt section-boundary precision. export const CHUNKING_CUTOFF_SECONDS = 25 * 60; // 25 min // Max concurrent analyze calls in flight. Gemini paid Tier 1 allows // ~1000 RPM for flash and ~150 RPM for pro — 12 in-flight is well // under either ceiling and saturates most operator workloads // without queueing. Operator hardware (vLLM on a single Spark) caps // out around 8-12 concurrent for our prompt size, so 12 is a // reasonable cross-backend default. const DEFAULT_CONCURRENCY = 12; // ── Window planning ───────────────────────────────────────────────────────── // Plans a set of overlapping windows over the entries array. Each // window has: // - startIdx, endIdx: inclusive bounds into the entries array // - bodyStartIdx: index where this window's "body" begins // (i.e., everything before this index is the // overlap with the previous window's tail) // The first window has bodyStartIdx === startIdx. Windows after the // first have bodyStartIdx > startIdx by ~overlap seconds. // // The stitcher uses bodyStartIdx of window N+1 to decide whether a // section from window N falls in the contested overlap region. export function planAnalysisWindows(entries, opts = {}) { const bodySec = opts.bodySeconds ?? WINDOW_BODY_SECONDS; const overlapSec = opts.overlapSeconds ?? WINDOW_OVERLAP_SECONDS; const totalSec = (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0); const cutoffSec = opts.cutoffSeconds ?? CHUNKING_CUTOFF_SECONDS; if (totalSec <= cutoffSec) { return [{ startIdx: 0, endIdx: entries.length - 1, bodyStartIdx: 0 }]; } const windows = []; let bodyStartSec = 0; while (bodyStartSec < totalSec) { // The window's covered span (body + tail overlap): const windowEndSec = bodyStartSec + bodySec + overlapSec; // Body start in entry-index space: first entry with offset >= bodyStartSec. const bodyStartIdx = firstEntryAtOrAfter(entries, bodyStartSec); // If there are NO entries at or after bodyStartSec, we've consumed // all entries. Stop the loop. if (bodyStartIdx >= entries.length) break; // GAP HANDLING: if the next entry after bodyStartSec is far in // the future (past this window's body + overlap), there's a gap // in the transcript timeline. This commonly happens when the // transcribe step truncated a middle chunk — the timeline has // valid entries at, e.g., 0-31 min and 90-94 min but nothing in // between. Without this fix, the old loop would BREAK at the gap // (because endIdx < bodyStartIdx triggered the "sparse trailing // window" exit), silently dropping the entries past the gap from // analysis entirely. Now we jump bodyStartSec forward to the // next entry's offset (rounded down to a body-stride boundary // so subsequent window alignment stays sensible) and continue. const nextEntryOffset = entries[bodyStartIdx].offset || 0; if (nextEntryOffset >= windowEndSec) { bodyStartSec = Math.max( bodyStartSec + bodySec, Math.floor(nextEntryOffset / bodySec) * bodySec ); continue; } // Window's entry range: from the start of overlap-with-prior // (i.e., bodyStartSec - overlapSec, clamped at 0) through windowEndSec. const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSec); const startIdx = firstEntryAtOrAfter(entries, overlapWithPriorSec); const endIdx = lastEntryBefore(entries, windowEndSec); if (endIdx < bodyStartIdx) { // Defensive: shouldn't happen with the gap-handling above, but // if it does, advance the body cursor rather than break so we // don't get stuck. bodyStartSec += bodySec; continue; } windows.push({ startIdx, endIdx, bodyStartIdx }); // Stop if this window already covers the last entry. if (endIdx >= entries.length - 1) break; bodyStartSec += bodySec; } return windows; } function firstEntryAtOrAfter(entries, sec) { // Linear scan; entries are sorted by offset. for (let i = 0; i < entries.length; i++) { if ((entries[i].offset || 0) >= sec) return i; } return entries.length; } function lastEntryBefore(entries, sec) { // Largest i s.t. entries[i].offset < sec. let ans = -1; for (let i = 0; i < entries.length; i++) { if ((entries[i].offset || 0) < sec) ans = i; else break; } // If no entry has offset < sec, return -1 → caller treats as empty. // If the whole array fits, return entries.length - 1. return ans === -1 ? -1 : ans; } // ── Parallel analyzer ─────────────────────────────────────────────────────── // Fires N analyze calls concurrently with a bounded in-flight count. // Each call gets its own slice of entries plus a freshly-built prompt. // Returns array of { window, ok, sections | error, cost, model }. // // Errors are isolated per window — a single-window failure doesn't // fail the whole batch. The stitcher gets to decide what to do // about gaps. async function analyzeWindowsInParallel({ entries, windows, analyzer, fallbackModels, concurrency, onProgress, onWindowComplete, signal, jobId, // Total audio duration in seconds — passed through to // buildAnalysisPrompt so the section-count target scales with the // full video length (not just per-window). Recap-relay does the // same; matching here keeps segmentation density consistent // across both pipelines. When omitted, buildAnalysisPrompt falls // back to deriving from the entries themselves. totalAudioSec = 0, }) { const results = new Array(windows.length); let next = 0; let completed = 0; async function worker() { while (true) { if (signal?.aborted) return; const my = next++; if (my >= windows.length) return; const w = windows[my]; const windowEntries = entries.slice(w.startIdx, w.endIdx + 1); const prompt = buildAnalysisPrompt(windowEntries, { totalAudioSec }); // Try the configured model first, then walk fallbacks. let lastErr = null; let result = null; let usedModel = null; for (const tryModel of fallbackModels) { try { result = await analyzer.analyzeText({ prompt, model: tryModel, onProgress: () => {}, // suppress per-chunk progress noise signal, jobId, }); usedModel = tryModel; break; } catch (err) { if (signal?.aborted) return; lastErr = err; } } if (!result) { results[my] = { window: w, ok: false, error: lastErr }; completed++; onProgress?.(`Window ${my + 1}/${windows.length} failed: ${lastErr?.message?.slice(0, 100) || "unknown"}`); continue; } const parsed = safeParseSections(result.text); if (!parsed) { results[my] = { window: w, ok: false, error: new Error("invalid JSON") }; completed++; onProgress?.(`Window ${my + 1}/${windows.length} returned invalid JSON`); continue; } results[my] = { window: w, ok: true, sections: parsed.sections, model: usedModel, cost: result.cost, }; completed++; onProgress?.(`Window ${my + 1}/${windows.length} done (${parsed.sections.length} topics)`); // Fire the streaming callback with this window's BODY-OWNED // sections — the ones the final stitcher will keep from this // window. Computed deterministically per-window so the UI can // render incrementally as windows arrive (even out of order), // without later having to "undo" any displayed sections. // // Rule: window N owns sections whose globalStart falls before // window(N+1).bodyStartIdx. Sections starting at or after the // next window's body are deferred — window N+1 will produce an // authoritative version of them with more downstream context. if (onWindowComplete) { const nextBody = my + 1 < windows.length ? windows[my + 1].bodyStartIdx : Infinity; const offset = w.startIdx; const owned = []; for (const s of parsed.sections) { const globalStart = offset + (s.startIndex ?? 0); const globalEnd = offset + (s.endIndex ?? 0); if (globalStart >= nextBody) continue; owned.push({ startIndex: globalStart, endIndex: globalEnd, title: s.title, summary: s.summary, }); } try { await onWindowComplete({ windowIdx: my, totalWindows: windows.length, ownedSections: owned, }); } catch (cbErr) { // Callback errors must not derail the analyze loop — // streaming is best-effort and the canonical result still // ships at the end. console.warn( `[chunked-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}` ); } } } } const workers = Array.from({ length: Math.min(concurrency, windows.length) }, worker); await Promise.all(workers); return results; } function safeParseSections(text) { if (!text) return null; let jsonStr = text.trim(); const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/); if (cb) jsonStr = cb[1].trim(); try { const parsed = JSON.parse(jsonStr); if (!parsed || !Array.isArray(parsed.sections)) return null; return parsed; } catch { return null; } } // ── Stitcher ──────────────────────────────────────────────────────────────── // Merges per-window section lists into a single ordered list of // non-overlapping sections referencing entries by their position in // the FULL (un-chunked) entries array. // // The rule: each window N owns sections whose globalStart falls in // its body (i.e., globalStart < window(N+1).bodyStartIdx). Any // section starting at or after the next window's body boundary is // dropped because the next window will have produced a better // version of that same section with more downstream context. The // last window has no successor, so all its sections are kept. // // After collection, sections are sorted and any residual overlap // (which shouldn't happen if windows are well-formed but might // arise from model index errors) is repaired by clamping endIndex // to the next section's startIndex - 1. export function stitchAnalysisResults(results) { const out = []; for (let i = 0; i < results.length; i++) { const r = results[i]; if (!r || !r.ok) continue; const next = results[i + 1]; const nextBody = next && next.window ? next.window.bodyStartIdx : Infinity; const offset = r.window.startIdx; for (const s of r.sections) { const globalStart = offset + (s.startIndex ?? 0); const globalEnd = offset + (s.endIndex ?? 0); // Drop sections that begin in the next window's body — the // next window's analysis is authoritative for that range. if (globalStart >= nextBody) continue; out.push({ startIndex: globalStart, endIndex: globalEnd, title: s.title, summary: s.summary, }); } } // Order + repair overlaps (defensive — shouldn't trigger with // well-behaved model output, but the existing single-shot path // doesn't either and this matches its robustness). out.sort((a, b) => a.startIndex - b.startIndex); for (let i = 0; i < out.length - 1; i++) { if (out[i].endIndex >= out[i + 1].startIndex) { out[i].endIndex = out[i + 1].startIndex - 1; } } return out.filter((s) => s.endIndex >= s.startIndex); } // ── Public entry point ────────────────────────────────────────────────────── // Runs chunked analysis end-to-end. Returns the same envelope shape // callers expect from a single-shot analyzer.analyzeText() call: // { // text: "", // for prompt/result parity // model: "", // cost: { total cost across all windows, summed }, // usage: null, // no aggregate usage // attempts: { windows: N, failed: K } // diagnostic // } // The caller parses .text the same way it parses a single-shot // response — no changes to the downstream chunk-building code. // // Falls back to single-shot if planning produces just one window // (i.e., content is below the chunking cutoff). If all windows fail, // throws so the caller's existing fallback (try next model) kicks in. export async function runChunkedAnalysis({ entries, analyzer, fallbackModels, concurrency = DEFAULT_CONCURRENCY, onProgress = () => {}, onWindowComplete = null, signal, jobId, }) { const windows = planAnalysisWindows(entries); if (windows.length === 1) { // Single-shot path — same as the legacy code does, but routed // through here so callers have one entry point. Log message // distinguishes the two reasons we end up here: // (a) totalSec ≤ cutoff — short content, intentionally not chunked // (b) entries are too sparse for multi-window planning — the loop // broke after one window. Surfaces an awkward state that's // usually a sign of bad upstream data (e.g. transcribe emitted // bogus far-future timestamps that the sanity-cap dropped). const lastEntry = entries[entries.length - 1]; const totalSec = (lastEntry?.offset || 0) + (lastEntry?.duration || 0); if (totalSec <= CHUNKING_CUTOFF_SECONDS) { onProgress( `Content ≤${Math.round(CHUNKING_CUTOFF_SECONDS / 60)} min — running single-shot analysis` ); } else { onProgress( `Single window planned over ${entries.length} entries (last @ ${Math.round(totalSec / 60)} min) — running single-shot analysis` ); } return await runSingleShot({ entries, analyzer, fallbackModels, onProgress, signal, jobId, }); } onProgress( `Chunked analysis: ${windows.length} windows of ~18 min each, up to ${concurrency} in parallel` ); // Compute total audio duration from the last entry's offset so the // section-count target (in buildAnalysisPrompt) scales with the // FULL video length, not just per-window. Matches recap-relay's // per-video-duration target methodology for consistent segmentation // density across both pipelines. const totalAudioSec = entries.length > 0 ? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0) : 0; const results = await analyzeWindowsInParallel({ entries, windows, analyzer, fallbackModels, concurrency, onProgress, onWindowComplete, signal, jobId, totalAudioSec, }); // If the caller aborted mid-flight, some result slots may be empty. // Surface cancellation cleanly to the outer pipeline. if (signal?.aborted) { const e = new Error("aborted"); e.name = "AbortError"; throw e; } const completed = results.filter(Boolean); const failures = completed.filter((r) => !r.ok); if (completed.length === 0 || failures.length === completed.length) { throw new Error( `All ${results.length} analyze windows failed. First error: ${ failures[0]?.error?.message || "unknown" }` ); } const stitched = stitchAnalysisResults(results); // Aggregate model attribution: pick the most-used successful model. const modelTally = new Map(); let totalCost = 0; for (const r of results) { if (!r.ok) continue; modelTally.set(r.model, (modelTally.get(r.model) || 0) + 1); const c = typeof r.cost?.totalCost === "string" ? parseFloat(r.cost.totalCost) : r.cost?.totalCost || 0; if (Number.isFinite(c)) totalCost += c; } const dominantModel = [...modelTally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null; onProgress( `Chunked analysis complete — ${results.length - failures.length}/${results.length} windows succeeded, ${stitched.length} topics` ); return { text: JSON.stringify({ sections: stitched }), model: dominantModel, cost: { totalCost: totalCost.toFixed(6), totalCostDisplay: totalCost < 0.01 ? `$${(totalCost * 100).toFixed(3)}¢` : `$${totalCost.toFixed(4)}`, }, usage: null, attempts: { windows: results.length, failed: failures.length }, }; } async function runSingleShot({ entries, analyzer, fallbackModels, onProgress, signal, jobId, }) { // Single-shot path: the whole transcript IS the "window". Compute // totalAudioSec from the entries so the section-count target picker // chooses the right bucket (<30 min → 6 sections, 30-60 → 8, etc.). const totalAudioSec = entries.length > 0 ? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0) : 0; const prompt = buildAnalysisPrompt(entries, { totalAudioSec }); let lastErr = null; for (const tryModel of fallbackModels) { try { const result = await analyzer.analyzeText({ prompt, model: tryModel, onProgress, signal, jobId, }); return result; } catch (err) { if (signal?.aborted) throw err; lastErr = err; } } throw lastErr || new Error("All analysis models failed"); }