0ae59f3550
Introduces RECAP_MODE=multi alongside single-mode self-host: - Tenant auth + accounts (magic-link via System SMTP), per-tenant credit pool, anonymous trial minting with per-IP/-64 caps - Self-serve Pro/Max purchase: inline Lightning (BTCPay) + card (Zaprite), prepaid 30-day periods, expiry-reminder emails - Core-decoupling: relay owns cloud tier/expiry keyed by Recaps user-id - SQLite (better-sqlite3) schema for multi-mode; filesystem unchanged for single - StartOS actions/versions through 0.2.155
488 lines
19 KiB
JavaScript
488 lines
19 KiB
JavaScript
// Chunked topic-analysis: split a long transcript into overlapping
|
|
// time-windowed slices, analyze each slice in parallel, stitch the
|
|
// returned sections back into one coherent list.
|
|
//
|
|
// Why: a single-shot analyze call against a 2-hour transcript spends
|
|
// most of its wall-time on prefill (typically 25K+ tokens). Splitting
|
|
// into 18-min slices gives the model a much smaller prompt per call,
|
|
// and firing the slices concurrently lets the backend (relay/vLLM or
|
|
// Gemini) batch them. End-to-end wall-time drops from minutes to
|
|
// tens of seconds for long content, with no quality regression as
|
|
// long as the slice boundaries are chosen with overlap and the
|
|
// stitcher trusts the second slice for the overlap region.
|
|
//
|
|
// Public entry point: runChunkedAnalysis().
|
|
|
|
import { buildAnalysisPrompt } from "./gemini-helpers.js";
|
|
|
|
// ── Tunables ────────────────────────────────────────────────────────────────
|
|
// Window body: the part of a chunk that "owns" its topic boundaries.
|
|
// Overlap: a tail appended to each window so a topic spanning a
|
|
// boundary still gets seen in full by at least one window.
|
|
// Stride = body. Windows advance by `body` seconds; each window
|
|
// covers `body + overlap` seconds of audio.
|
|
const WINDOW_BODY_SECONDS = 18 * 60; // 18 min
|
|
const WINDOW_OVERLAP_SECONDS = 2 * 60; // 2 min
|
|
// Don't chunk below this duration. A single analyze call against
|
|
// <25 min is fast on its own and avoids the stitching complexity
|
|
// for the common short-content case.
|
|
// Exported so the orchestrator can mirror the decision when picking
|
|
// whether to coalesce: above this duration the chunker handles
|
|
// granularity per-window, so the pre-chunk coalesce is unnecessary
|
|
// and would hurt section-boundary precision.
|
|
export const CHUNKING_CUTOFF_SECONDS = 25 * 60; // 25 min
|
|
// Max concurrent analyze calls in flight. Gemini paid Tier 1 allows
|
|
// ~1000 RPM for flash and ~150 RPM for pro — 12 in-flight is well
|
|
// under either ceiling and saturates most operator workloads
|
|
// without queueing. Operator hardware (vLLM on a single Spark) caps
|
|
// out around 8-12 concurrent for our prompt size, so 12 is a
|
|
// reasonable cross-backend default.
|
|
const DEFAULT_CONCURRENCY = 12;
|
|
|
|
// ── Window planning ─────────────────────────────────────────────────────────
|
|
// Plans a set of overlapping windows over the entries array. Each
|
|
// window has:
|
|
// - startIdx, endIdx: inclusive bounds into the entries array
|
|
// - bodyStartIdx: index where this window's "body" begins
|
|
// (i.e., everything before this index is the
|
|
// overlap with the previous window's tail)
|
|
// The first window has bodyStartIdx === startIdx. Windows after the
|
|
// first have bodyStartIdx > startIdx by ~overlap seconds.
|
|
//
|
|
// The stitcher uses bodyStartIdx of window N+1 to decide whether a
|
|
// section from window N falls in the contested overlap region.
|
|
export function planAnalysisWindows(entries, opts = {}) {
|
|
const bodySec = opts.bodySeconds ?? WINDOW_BODY_SECONDS;
|
|
const overlapSec = opts.overlapSeconds ?? WINDOW_OVERLAP_SECONDS;
|
|
const totalSec = (entries[entries.length - 1].offset || 0) +
|
|
(entries[entries.length - 1].duration || 0);
|
|
const cutoffSec = opts.cutoffSeconds ?? CHUNKING_CUTOFF_SECONDS;
|
|
if (totalSec <= cutoffSec) {
|
|
return [{ startIdx: 0, endIdx: entries.length - 1, bodyStartIdx: 0 }];
|
|
}
|
|
|
|
const windows = [];
|
|
let bodyStartSec = 0;
|
|
while (bodyStartSec < totalSec) {
|
|
// The window's covered span (body + tail overlap):
|
|
const windowEndSec = bodyStartSec + bodySec + overlapSec;
|
|
// Body start in entry-index space: first entry with offset >= bodyStartSec.
|
|
const bodyStartIdx = firstEntryAtOrAfter(entries, bodyStartSec);
|
|
// If there are NO entries at or after bodyStartSec, we've consumed
|
|
// all entries. Stop the loop.
|
|
if (bodyStartIdx >= entries.length) break;
|
|
// GAP HANDLING: if the next entry after bodyStartSec is far in
|
|
// the future (past this window's body + overlap), there's a gap
|
|
// in the transcript timeline. This commonly happens when the
|
|
// transcribe step truncated a middle chunk — the timeline has
|
|
// valid entries at, e.g., 0-31 min and 90-94 min but nothing in
|
|
// between. Without this fix, the old loop would BREAK at the gap
|
|
// (because endIdx < bodyStartIdx triggered the "sparse trailing
|
|
// window" exit), silently dropping the entries past the gap from
|
|
// analysis entirely. Now we jump bodyStartSec forward to the
|
|
// next entry's offset (rounded down to a body-stride boundary
|
|
// so subsequent window alignment stays sensible) and continue.
|
|
const nextEntryOffset = entries[bodyStartIdx].offset || 0;
|
|
if (nextEntryOffset >= windowEndSec) {
|
|
bodyStartSec = Math.max(
|
|
bodyStartSec + bodySec,
|
|
Math.floor(nextEntryOffset / bodySec) * bodySec
|
|
);
|
|
continue;
|
|
}
|
|
// Window's entry range: from the start of overlap-with-prior
|
|
// (i.e., bodyStartSec - overlapSec, clamped at 0) through windowEndSec.
|
|
const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSec);
|
|
const startIdx = firstEntryAtOrAfter(entries, overlapWithPriorSec);
|
|
const endIdx = lastEntryBefore(entries, windowEndSec);
|
|
if (endIdx < bodyStartIdx) {
|
|
// Defensive: shouldn't happen with the gap-handling above, but
|
|
// if it does, advance the body cursor rather than break so we
|
|
// don't get stuck.
|
|
bodyStartSec += bodySec;
|
|
continue;
|
|
}
|
|
windows.push({ startIdx, endIdx, bodyStartIdx });
|
|
// Stop if this window already covers the last entry.
|
|
if (endIdx >= entries.length - 1) break;
|
|
bodyStartSec += bodySec;
|
|
}
|
|
return windows;
|
|
}
|
|
|
|
function firstEntryAtOrAfter(entries, sec) {
|
|
// Linear scan; entries are sorted by offset.
|
|
for (let i = 0; i < entries.length; i++) {
|
|
if ((entries[i].offset || 0) >= sec) return i;
|
|
}
|
|
return entries.length;
|
|
}
|
|
|
|
function lastEntryBefore(entries, sec) {
|
|
// Largest i s.t. entries[i].offset < sec.
|
|
let ans = -1;
|
|
for (let i = 0; i < entries.length; i++) {
|
|
if ((entries[i].offset || 0) < sec) ans = i;
|
|
else break;
|
|
}
|
|
// If no entry has offset < sec, return -1 → caller treats as empty.
|
|
// If the whole array fits, return entries.length - 1.
|
|
return ans === -1 ? -1 : ans;
|
|
}
|
|
|
|
// ── Parallel analyzer ───────────────────────────────────────────────────────
|
|
// Fires N analyze calls concurrently with a bounded in-flight count.
|
|
// Each call gets its own slice of entries plus a freshly-built prompt.
|
|
// Returns array of { window, ok, sections | error, cost, model }.
|
|
//
|
|
// Errors are isolated per window — a single-window failure doesn't
|
|
// fail the whole batch. The stitcher gets to decide what to do
|
|
// about gaps.
|
|
async function analyzeWindowsInParallel({
|
|
entries,
|
|
windows,
|
|
analyzer,
|
|
fallbackModels,
|
|
concurrency,
|
|
onProgress,
|
|
onWindowComplete,
|
|
signal,
|
|
jobId,
|
|
// Total audio duration in seconds — passed through to
|
|
// buildAnalysisPrompt so the section-count target scales with the
|
|
// full video length (not just per-window). Recap-relay does the
|
|
// same; matching here keeps segmentation density consistent
|
|
// across both pipelines. When omitted, buildAnalysisPrompt falls
|
|
// back to deriving from the entries themselves.
|
|
totalAudioSec = 0,
|
|
}) {
|
|
const results = new Array(windows.length);
|
|
let next = 0;
|
|
let completed = 0;
|
|
|
|
async function worker() {
|
|
while (true) {
|
|
if (signal?.aborted) return;
|
|
const my = next++;
|
|
if (my >= windows.length) return;
|
|
const w = windows[my];
|
|
const windowEntries = entries.slice(w.startIdx, w.endIdx + 1);
|
|
const prompt = buildAnalysisPrompt(windowEntries, { totalAudioSec });
|
|
// Try the configured model first, then walk fallbacks.
|
|
let lastErr = null;
|
|
let result = null;
|
|
let usedModel = null;
|
|
for (const tryModel of fallbackModels) {
|
|
try {
|
|
result = await analyzer.analyzeText({
|
|
prompt,
|
|
model: tryModel,
|
|
onProgress: () => {}, // suppress per-chunk progress noise
|
|
signal,
|
|
jobId,
|
|
});
|
|
usedModel = tryModel;
|
|
break;
|
|
} catch (err) {
|
|
if (signal?.aborted) return;
|
|
lastErr = err;
|
|
}
|
|
}
|
|
if (!result) {
|
|
results[my] = { window: w, ok: false, error: lastErr };
|
|
completed++;
|
|
onProgress?.(`Window ${my + 1}/${windows.length} failed: ${lastErr?.message?.slice(0, 100) || "unknown"}`);
|
|
continue;
|
|
}
|
|
const parsed = safeParseSections(result.text);
|
|
if (!parsed) {
|
|
results[my] = { window: w, ok: false, error: new Error("invalid JSON") };
|
|
completed++;
|
|
onProgress?.(`Window ${my + 1}/${windows.length} returned invalid JSON`);
|
|
continue;
|
|
}
|
|
results[my] = {
|
|
window: w,
|
|
ok: true,
|
|
sections: parsed.sections,
|
|
model: usedModel,
|
|
cost: result.cost,
|
|
};
|
|
completed++;
|
|
onProgress?.(`Window ${my + 1}/${windows.length} done (${parsed.sections.length} topics)`);
|
|
|
|
// Fire the streaming callback with this window's BODY-OWNED
|
|
// sections — the ones the final stitcher will keep from this
|
|
// window. Computed deterministically per-window so the UI can
|
|
// render incrementally as windows arrive (even out of order),
|
|
// without later having to "undo" any displayed sections.
|
|
//
|
|
// Rule: window N owns sections whose globalStart falls before
|
|
// window(N+1).bodyStartIdx. Sections starting at or after the
|
|
// next window's body are deferred — window N+1 will produce an
|
|
// authoritative version of them with more downstream context.
|
|
if (onWindowComplete) {
|
|
const nextBody = my + 1 < windows.length
|
|
? windows[my + 1].bodyStartIdx
|
|
: Infinity;
|
|
const offset = w.startIdx;
|
|
const owned = [];
|
|
for (const s of parsed.sections) {
|
|
const globalStart = offset + (s.startIndex ?? 0);
|
|
const globalEnd = offset + (s.endIndex ?? 0);
|
|
if (globalStart >= nextBody) continue;
|
|
owned.push({
|
|
startIndex: globalStart,
|
|
endIndex: globalEnd,
|
|
title: s.title,
|
|
summary: s.summary,
|
|
});
|
|
}
|
|
try {
|
|
await onWindowComplete({
|
|
windowIdx: my,
|
|
totalWindows: windows.length,
|
|
ownedSections: owned,
|
|
});
|
|
} catch (cbErr) {
|
|
// Callback errors must not derail the analyze loop —
|
|
// streaming is best-effort and the canonical result still
|
|
// ships at the end.
|
|
console.warn(
|
|
`[chunked-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}`
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const workers = Array.from({ length: Math.min(concurrency, windows.length) }, worker);
|
|
await Promise.all(workers);
|
|
return results;
|
|
}
|
|
|
|
function safeParseSections(text) {
|
|
if (!text) return null;
|
|
let jsonStr = text.trim();
|
|
const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/);
|
|
if (cb) jsonStr = cb[1].trim();
|
|
try {
|
|
const parsed = JSON.parse(jsonStr);
|
|
if (!parsed || !Array.isArray(parsed.sections)) return null;
|
|
return parsed;
|
|
} catch {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// ── Stitcher ────────────────────────────────────────────────────────────────
|
|
// Merges per-window section lists into a single ordered list of
|
|
// non-overlapping sections referencing entries by their position in
|
|
// the FULL (un-chunked) entries array.
|
|
//
|
|
// The rule: each window N owns sections whose globalStart falls in
|
|
// its body (i.e., globalStart < window(N+1).bodyStartIdx). Any
|
|
// section starting at or after the next window's body boundary is
|
|
// dropped because the next window will have produced a better
|
|
// version of that same section with more downstream context. The
|
|
// last window has no successor, so all its sections are kept.
|
|
//
|
|
// After collection, sections are sorted and any residual overlap
|
|
// (which shouldn't happen if windows are well-formed but might
|
|
// arise from model index errors) is repaired by clamping endIndex
|
|
// to the next section's startIndex - 1.
|
|
export function stitchAnalysisResults(results) {
|
|
const out = [];
|
|
for (let i = 0; i < results.length; i++) {
|
|
const r = results[i];
|
|
if (!r || !r.ok) continue;
|
|
const next = results[i + 1];
|
|
const nextBody = next && next.window
|
|
? next.window.bodyStartIdx
|
|
: Infinity;
|
|
const offset = r.window.startIdx;
|
|
for (const s of r.sections) {
|
|
const globalStart = offset + (s.startIndex ?? 0);
|
|
const globalEnd = offset + (s.endIndex ?? 0);
|
|
// Drop sections that begin in the next window's body — the
|
|
// next window's analysis is authoritative for that range.
|
|
if (globalStart >= nextBody) continue;
|
|
out.push({
|
|
startIndex: globalStart,
|
|
endIndex: globalEnd,
|
|
title: s.title,
|
|
summary: s.summary,
|
|
});
|
|
}
|
|
}
|
|
// Order + repair overlaps (defensive — shouldn't trigger with
|
|
// well-behaved model output, but the existing single-shot path
|
|
// doesn't either and this matches its robustness).
|
|
out.sort((a, b) => a.startIndex - b.startIndex);
|
|
for (let i = 0; i < out.length - 1; i++) {
|
|
if (out[i].endIndex >= out[i + 1].startIndex) {
|
|
out[i].endIndex = out[i + 1].startIndex - 1;
|
|
}
|
|
}
|
|
return out.filter((s) => s.endIndex >= s.startIndex);
|
|
}
|
|
|
|
// ── Public entry point ──────────────────────────────────────────────────────
|
|
// Runs chunked analysis end-to-end. Returns the same envelope shape
|
|
// callers expect from a single-shot analyzer.analyzeText() call:
|
|
// {
|
|
// text: "<JSON string with .sections>", // for prompt/result parity
|
|
// model: "<which model served the most windows>",
|
|
// cost: { total cost across all windows, summed },
|
|
// usage: null, // no aggregate usage
|
|
// attempts: { windows: N, failed: K } // diagnostic
|
|
// }
|
|
// The caller parses .text the same way it parses a single-shot
|
|
// response — no changes to the downstream chunk-building code.
|
|
//
|
|
// Falls back to single-shot if planning produces just one window
|
|
// (i.e., content is below the chunking cutoff). If all windows fail,
|
|
// throws so the caller's existing fallback (try next model) kicks in.
|
|
export async function runChunkedAnalysis({
|
|
entries,
|
|
analyzer,
|
|
fallbackModels,
|
|
concurrency = DEFAULT_CONCURRENCY,
|
|
onProgress = () => {},
|
|
onWindowComplete = null,
|
|
signal,
|
|
jobId,
|
|
}) {
|
|
const windows = planAnalysisWindows(entries);
|
|
if (windows.length === 1) {
|
|
// Single-shot path — same as the legacy code does, but routed
|
|
// through here so callers have one entry point. Log message
|
|
// distinguishes the two reasons we end up here:
|
|
// (a) totalSec ≤ cutoff — short content, intentionally not chunked
|
|
// (b) entries are too sparse for multi-window planning — the loop
|
|
// broke after one window. Surfaces an awkward state that's
|
|
// usually a sign of bad upstream data (e.g. transcribe emitted
|
|
// bogus far-future timestamps that the sanity-cap dropped).
|
|
const lastEntry = entries[entries.length - 1];
|
|
const totalSec = (lastEntry?.offset || 0) + (lastEntry?.duration || 0);
|
|
if (totalSec <= CHUNKING_CUTOFF_SECONDS) {
|
|
onProgress(
|
|
`Content ≤${Math.round(CHUNKING_CUTOFF_SECONDS / 60)} min — running single-shot analysis`
|
|
);
|
|
} else {
|
|
onProgress(
|
|
`Single window planned over ${entries.length} entries (last @ ${Math.round(totalSec / 60)} min) — running single-shot analysis`
|
|
);
|
|
}
|
|
return await runSingleShot({
|
|
entries,
|
|
analyzer,
|
|
fallbackModels,
|
|
onProgress,
|
|
signal,
|
|
jobId,
|
|
});
|
|
}
|
|
onProgress(
|
|
`Chunked analysis: ${windows.length} windows of ~18 min each, up to ${concurrency} in parallel`
|
|
);
|
|
// Compute total audio duration from the last entry's offset so the
|
|
// section-count target (in buildAnalysisPrompt) scales with the
|
|
// FULL video length, not just per-window. Matches recap-relay's
|
|
// per-video-duration target methodology for consistent segmentation
|
|
// density across both pipelines.
|
|
const totalAudioSec = entries.length > 0
|
|
? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0)
|
|
: 0;
|
|
const results = await analyzeWindowsInParallel({
|
|
entries,
|
|
windows,
|
|
analyzer,
|
|
fallbackModels,
|
|
concurrency,
|
|
onProgress,
|
|
onWindowComplete,
|
|
signal,
|
|
jobId,
|
|
totalAudioSec,
|
|
});
|
|
// If the caller aborted mid-flight, some result slots may be empty.
|
|
// Surface cancellation cleanly to the outer pipeline.
|
|
if (signal?.aborted) {
|
|
const e = new Error("aborted");
|
|
e.name = "AbortError";
|
|
throw e;
|
|
}
|
|
const completed = results.filter(Boolean);
|
|
const failures = completed.filter((r) => !r.ok);
|
|
if (completed.length === 0 || failures.length === completed.length) {
|
|
throw new Error(
|
|
`All ${results.length} analyze windows failed. First error: ${
|
|
failures[0]?.error?.message || "unknown"
|
|
}`
|
|
);
|
|
}
|
|
const stitched = stitchAnalysisResults(results);
|
|
// Aggregate model attribution: pick the most-used successful model.
|
|
const modelTally = new Map();
|
|
let totalCost = 0;
|
|
for (const r of results) {
|
|
if (!r.ok) continue;
|
|
modelTally.set(r.model, (modelTally.get(r.model) || 0) + 1);
|
|
const c = typeof r.cost?.totalCost === "string"
|
|
? parseFloat(r.cost.totalCost)
|
|
: r.cost?.totalCost || 0;
|
|
if (Number.isFinite(c)) totalCost += c;
|
|
}
|
|
const dominantModel = [...modelTally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null;
|
|
onProgress(
|
|
`Chunked analysis complete — ${results.length - failures.length}/${results.length} windows succeeded, ${stitched.length} topics`
|
|
);
|
|
return {
|
|
text: JSON.stringify({ sections: stitched }),
|
|
model: dominantModel,
|
|
cost: {
|
|
totalCost: totalCost.toFixed(6),
|
|
totalCostDisplay: totalCost < 0.01
|
|
? `$${(totalCost * 100).toFixed(3)}¢`
|
|
: `$${totalCost.toFixed(4)}`,
|
|
},
|
|
usage: null,
|
|
attempts: { windows: results.length, failed: failures.length },
|
|
};
|
|
}
|
|
|
|
async function runSingleShot({
|
|
entries,
|
|
analyzer,
|
|
fallbackModels,
|
|
onProgress,
|
|
signal,
|
|
jobId,
|
|
}) {
|
|
// Single-shot path: the whole transcript IS the "window". Compute
|
|
// totalAudioSec from the entries so the section-count target picker
|
|
// chooses the right bucket (<30 min → 6 sections, 30-60 → 8, etc.).
|
|
const totalAudioSec = entries.length > 0
|
|
? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0)
|
|
: 0;
|
|
const prompt = buildAnalysisPrompt(entries, { totalAudioSec });
|
|
let lastErr = null;
|
|
for (const tryModel of fallbackModels) {
|
|
try {
|
|
const result = await analyzer.analyzeText({
|
|
prompt,
|
|
model: tryModel,
|
|
onProgress,
|
|
signal,
|
|
jobId,
|
|
});
|
|
return result;
|
|
} catch (err) {
|
|
if (signal?.aborted) throw err;
|
|
lastErr = err;
|
|
}
|
|
}
|
|
throw lastErr || new Error("All analysis models failed");
|
|
}
|