Files
recap/server/chunked-analyze.js
T
Keysat 0ae59f3550 Add multi-tenant cloud mode: self-serve purchase, credit metering, core-decoupling
Introduces RECAP_MODE=multi alongside single-mode self-host:
- Tenant auth + accounts (magic-link via System SMTP), per-tenant credit pool,
  anonymous trial minting with per-IP/-64 caps
- Self-serve Pro/Max purchase: inline Lightning (BTCPay) + card (Zaprite),
  prepaid 30-day periods, expiry-reminder emails
- Core-decoupling: relay owns cloud tier/expiry keyed by Recaps user-id
- SQLite (better-sqlite3) schema for multi-mode; filesystem unchanged for single
- StartOS actions/versions through 0.2.155
2026-06-13 14:25:05 -05:00

488 lines
19 KiB
JavaScript

// Chunked topic-analysis: split a long transcript into overlapping
// time-windowed slices, analyze each slice in parallel, stitch the
// returned sections back into one coherent list.
//
// Why: a single-shot analyze call against a 2-hour transcript spends
// most of its wall-time on prefill (typically 25K+ tokens). Splitting
// into 18-min slices gives the model a much smaller prompt per call,
// and firing the slices concurrently lets the backend (relay/vLLM or
// Gemini) batch them. End-to-end wall-time drops from minutes to
// tens of seconds for long content, with no quality regression as
// long as the slice boundaries are chosen with overlap and the
// stitcher trusts the second slice for the overlap region.
//
// Public entry point: runChunkedAnalysis().
import { buildAnalysisPrompt } from "./gemini-helpers.js";
// ── Tunables ────────────────────────────────────────────────────────────────
// Window body: the part of a chunk that "owns" its topic boundaries.
// Overlap: a tail appended to each window so a topic spanning a
// boundary still gets seen in full by at least one window.
// Stride = body. Windows advance by `body` seconds; each window
// covers `body + overlap` seconds of audio.
const WINDOW_BODY_SECONDS = 18 * 60; // 18 min
const WINDOW_OVERLAP_SECONDS = 2 * 60; // 2 min
// Don't chunk below this duration. A single analyze call against
// <25 min is fast on its own and avoids the stitching complexity
// for the common short-content case.
// Exported so the orchestrator can mirror the decision when picking
// whether to coalesce: above this duration the chunker handles
// granularity per-window, so the pre-chunk coalesce is unnecessary
// and would hurt section-boundary precision.
export const CHUNKING_CUTOFF_SECONDS = 25 * 60; // 25 min
// Max concurrent analyze calls in flight. Gemini paid Tier 1 allows
// ~1000 RPM for flash and ~150 RPM for pro — 12 in-flight is well
// under either ceiling and saturates most operator workloads
// without queueing. Operator hardware (vLLM on a single Spark) caps
// out around 8-12 concurrent for our prompt size, so 12 is a
// reasonable cross-backend default.
const DEFAULT_CONCURRENCY = 12;
// ── Window planning ─────────────────────────────────────────────────────────
// Plans a set of overlapping windows over the entries array. Each
// window has:
// - startIdx, endIdx: inclusive bounds into the entries array
// - bodyStartIdx: index where this window's "body" begins
// (i.e., everything before this index is the
// overlap with the previous window's tail)
// The first window has bodyStartIdx === startIdx. Windows after the
// first have bodyStartIdx > startIdx by ~overlap seconds.
//
// The stitcher uses bodyStartIdx of window N+1 to decide whether a
// section from window N falls in the contested overlap region.
export function planAnalysisWindows(entries, opts = {}) {
const bodySec = opts.bodySeconds ?? WINDOW_BODY_SECONDS;
const overlapSec = opts.overlapSeconds ?? WINDOW_OVERLAP_SECONDS;
const totalSec = (entries[entries.length - 1].offset || 0) +
(entries[entries.length - 1].duration || 0);
const cutoffSec = opts.cutoffSeconds ?? CHUNKING_CUTOFF_SECONDS;
if (totalSec <= cutoffSec) {
return [{ startIdx: 0, endIdx: entries.length - 1, bodyStartIdx: 0 }];
}
const windows = [];
let bodyStartSec = 0;
while (bodyStartSec < totalSec) {
// The window's covered span (body + tail overlap):
const windowEndSec = bodyStartSec + bodySec + overlapSec;
// Body start in entry-index space: first entry with offset >= bodyStartSec.
const bodyStartIdx = firstEntryAtOrAfter(entries, bodyStartSec);
// If there are NO entries at or after bodyStartSec, we've consumed
// all entries. Stop the loop.
if (bodyStartIdx >= entries.length) break;
// GAP HANDLING: if the next entry after bodyStartSec is far in
// the future (past this window's body + overlap), there's a gap
// in the transcript timeline. This commonly happens when the
// transcribe step truncated a middle chunk — the timeline has
// valid entries at, e.g., 0-31 min and 90-94 min but nothing in
// between. Without this fix, the old loop would BREAK at the gap
// (because endIdx < bodyStartIdx triggered the "sparse trailing
// window" exit), silently dropping the entries past the gap from
// analysis entirely. Now we jump bodyStartSec forward to the
// next entry's offset (rounded down to a body-stride boundary
// so subsequent window alignment stays sensible) and continue.
const nextEntryOffset = entries[bodyStartIdx].offset || 0;
if (nextEntryOffset >= windowEndSec) {
bodyStartSec = Math.max(
bodyStartSec + bodySec,
Math.floor(nextEntryOffset / bodySec) * bodySec
);
continue;
}
// Window's entry range: from the start of overlap-with-prior
// (i.e., bodyStartSec - overlapSec, clamped at 0) through windowEndSec.
const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSec);
const startIdx = firstEntryAtOrAfter(entries, overlapWithPriorSec);
const endIdx = lastEntryBefore(entries, windowEndSec);
if (endIdx < bodyStartIdx) {
// Defensive: shouldn't happen with the gap-handling above, but
// if it does, advance the body cursor rather than break so we
// don't get stuck.
bodyStartSec += bodySec;
continue;
}
windows.push({ startIdx, endIdx, bodyStartIdx });
// Stop if this window already covers the last entry.
if (endIdx >= entries.length - 1) break;
bodyStartSec += bodySec;
}
return windows;
}
function firstEntryAtOrAfter(entries, sec) {
// Linear scan; entries are sorted by offset.
for (let i = 0; i < entries.length; i++) {
if ((entries[i].offset || 0) >= sec) return i;
}
return entries.length;
}
function lastEntryBefore(entries, sec) {
// Largest i s.t. entries[i].offset < sec.
let ans = -1;
for (let i = 0; i < entries.length; i++) {
if ((entries[i].offset || 0) < sec) ans = i;
else break;
}
// If no entry has offset < sec, return -1 → caller treats as empty.
// If the whole array fits, return entries.length - 1.
return ans === -1 ? -1 : ans;
}
// ── Parallel analyzer ───────────────────────────────────────────────────────
// Fires N analyze calls concurrently with a bounded in-flight count.
// Each call gets its own slice of entries plus a freshly-built prompt.
// Returns array of { window, ok, sections | error, cost, model }.
//
// Errors are isolated per window — a single-window failure doesn't
// fail the whole batch. The stitcher gets to decide what to do
// about gaps.
async function analyzeWindowsInParallel({
entries,
windows,
analyzer,
fallbackModels,
concurrency,
onProgress,
onWindowComplete,
signal,
jobId,
// Total audio duration in seconds — passed through to
// buildAnalysisPrompt so the section-count target scales with the
// full video length (not just per-window). Recap-relay does the
// same; matching here keeps segmentation density consistent
// across both pipelines. When omitted, buildAnalysisPrompt falls
// back to deriving from the entries themselves.
totalAudioSec = 0,
}) {
const results = new Array(windows.length);
let next = 0;
let completed = 0;
async function worker() {
while (true) {
if (signal?.aborted) return;
const my = next++;
if (my >= windows.length) return;
const w = windows[my];
const windowEntries = entries.slice(w.startIdx, w.endIdx + 1);
const prompt = buildAnalysisPrompt(windowEntries, { totalAudioSec });
// Try the configured model first, then walk fallbacks.
let lastErr = null;
let result = null;
let usedModel = null;
for (const tryModel of fallbackModels) {
try {
result = await analyzer.analyzeText({
prompt,
model: tryModel,
onProgress: () => {}, // suppress per-chunk progress noise
signal,
jobId,
});
usedModel = tryModel;
break;
} catch (err) {
if (signal?.aborted) return;
lastErr = err;
}
}
if (!result) {
results[my] = { window: w, ok: false, error: lastErr };
completed++;
onProgress?.(`Window ${my + 1}/${windows.length} failed: ${lastErr?.message?.slice(0, 100) || "unknown"}`);
continue;
}
const parsed = safeParseSections(result.text);
if (!parsed) {
results[my] = { window: w, ok: false, error: new Error("invalid JSON") };
completed++;
onProgress?.(`Window ${my + 1}/${windows.length} returned invalid JSON`);
continue;
}
results[my] = {
window: w,
ok: true,
sections: parsed.sections,
model: usedModel,
cost: result.cost,
};
completed++;
onProgress?.(`Window ${my + 1}/${windows.length} done (${parsed.sections.length} topics)`);
// Fire the streaming callback with this window's BODY-OWNED
// sections — the ones the final stitcher will keep from this
// window. Computed deterministically per-window so the UI can
// render incrementally as windows arrive (even out of order),
// without later having to "undo" any displayed sections.
//
// Rule: window N owns sections whose globalStart falls before
// window(N+1).bodyStartIdx. Sections starting at or after the
// next window's body are deferred — window N+1 will produce an
// authoritative version of them with more downstream context.
if (onWindowComplete) {
const nextBody = my + 1 < windows.length
? windows[my + 1].bodyStartIdx
: Infinity;
const offset = w.startIdx;
const owned = [];
for (const s of parsed.sections) {
const globalStart = offset + (s.startIndex ?? 0);
const globalEnd = offset + (s.endIndex ?? 0);
if (globalStart >= nextBody) continue;
owned.push({
startIndex: globalStart,
endIndex: globalEnd,
title: s.title,
summary: s.summary,
});
}
try {
await onWindowComplete({
windowIdx: my,
totalWindows: windows.length,
ownedSections: owned,
});
} catch (cbErr) {
// Callback errors must not derail the analyze loop —
// streaming is best-effort and the canonical result still
// ships at the end.
console.warn(
`[chunked-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}`
);
}
}
}
}
const workers = Array.from({ length: Math.min(concurrency, windows.length) }, worker);
await Promise.all(workers);
return results;
}
function safeParseSections(text) {
if (!text) return null;
let jsonStr = text.trim();
const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/);
if (cb) jsonStr = cb[1].trim();
try {
const parsed = JSON.parse(jsonStr);
if (!parsed || !Array.isArray(parsed.sections)) return null;
return parsed;
} catch {
return null;
}
}
// ── Stitcher ────────────────────────────────────────────────────────────────
// Merges per-window section lists into a single ordered list of
// non-overlapping sections referencing entries by their position in
// the FULL (un-chunked) entries array.
//
// The rule: each window N owns sections whose globalStart falls in
// its body (i.e., globalStart < window(N+1).bodyStartIdx). Any
// section starting at or after the next window's body boundary is
// dropped because the next window will have produced a better
// version of that same section with more downstream context. The
// last window has no successor, so all its sections are kept.
//
// After collection, sections are sorted and any residual overlap
// (which shouldn't happen if windows are well-formed but might
// arise from model index errors) is repaired by clamping endIndex
// to the next section's startIndex - 1.
export function stitchAnalysisResults(results) {
const out = [];
for (let i = 0; i < results.length; i++) {
const r = results[i];
if (!r || !r.ok) continue;
const next = results[i + 1];
const nextBody = next && next.window
? next.window.bodyStartIdx
: Infinity;
const offset = r.window.startIdx;
for (const s of r.sections) {
const globalStart = offset + (s.startIndex ?? 0);
const globalEnd = offset + (s.endIndex ?? 0);
// Drop sections that begin in the next window's body — the
// next window's analysis is authoritative for that range.
if (globalStart >= nextBody) continue;
out.push({
startIndex: globalStart,
endIndex: globalEnd,
title: s.title,
summary: s.summary,
});
}
}
// Order + repair overlaps (defensive — shouldn't trigger with
// well-behaved model output, but the existing single-shot path
// doesn't either and this matches its robustness).
out.sort((a, b) => a.startIndex - b.startIndex);
for (let i = 0; i < out.length - 1; i++) {
if (out[i].endIndex >= out[i + 1].startIndex) {
out[i].endIndex = out[i + 1].startIndex - 1;
}
}
return out.filter((s) => s.endIndex >= s.startIndex);
}
// ── Public entry point ──────────────────────────────────────────────────────
// Runs chunked analysis end-to-end. Returns the same envelope shape
// callers expect from a single-shot analyzer.analyzeText() call:
// {
// text: "<JSON string with .sections>", // for prompt/result parity
// model: "<which model served the most windows>",
// cost: { total cost across all windows, summed },
// usage: null, // no aggregate usage
// attempts: { windows: N, failed: K } // diagnostic
// }
// The caller parses .text the same way it parses a single-shot
// response — no changes to the downstream chunk-building code.
//
// Falls back to single-shot if planning produces just one window
// (i.e., content is below the chunking cutoff). If all windows fail,
// throws so the caller's existing fallback (try next model) kicks in.
export async function runChunkedAnalysis({
entries,
analyzer,
fallbackModels,
concurrency = DEFAULT_CONCURRENCY,
onProgress = () => {},
onWindowComplete = null,
signal,
jobId,
}) {
const windows = planAnalysisWindows(entries);
if (windows.length === 1) {
// Single-shot path — same as the legacy code does, but routed
// through here so callers have one entry point. Log message
// distinguishes the two reasons we end up here:
// (a) totalSec ≤ cutoff — short content, intentionally not chunked
// (b) entries are too sparse for multi-window planning — the loop
// broke after one window. Surfaces an awkward state that's
// usually a sign of bad upstream data (e.g. transcribe emitted
// bogus far-future timestamps that the sanity-cap dropped).
const lastEntry = entries[entries.length - 1];
const totalSec = (lastEntry?.offset || 0) + (lastEntry?.duration || 0);
if (totalSec <= CHUNKING_CUTOFF_SECONDS) {
onProgress(
`Content ≤${Math.round(CHUNKING_CUTOFF_SECONDS / 60)} min — running single-shot analysis`
);
} else {
onProgress(
`Single window planned over ${entries.length} entries (last @ ${Math.round(totalSec / 60)} min) — running single-shot analysis`
);
}
return await runSingleShot({
entries,
analyzer,
fallbackModels,
onProgress,
signal,
jobId,
});
}
onProgress(
`Chunked analysis: ${windows.length} windows of ~18 min each, up to ${concurrency} in parallel`
);
// Compute total audio duration from the last entry's offset so the
// section-count target (in buildAnalysisPrompt) scales with the
// FULL video length, not just per-window. Matches recap-relay's
// per-video-duration target methodology for consistent segmentation
// density across both pipelines.
const totalAudioSec = entries.length > 0
? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0)
: 0;
const results = await analyzeWindowsInParallel({
entries,
windows,
analyzer,
fallbackModels,
concurrency,
onProgress,
onWindowComplete,
signal,
jobId,
totalAudioSec,
});
// If the caller aborted mid-flight, some result slots may be empty.
// Surface cancellation cleanly to the outer pipeline.
if (signal?.aborted) {
const e = new Error("aborted");
e.name = "AbortError";
throw e;
}
const completed = results.filter(Boolean);
const failures = completed.filter((r) => !r.ok);
if (completed.length === 0 || failures.length === completed.length) {
throw new Error(
`All ${results.length} analyze windows failed. First error: ${
failures[0]?.error?.message || "unknown"
}`
);
}
const stitched = stitchAnalysisResults(results);
// Aggregate model attribution: pick the most-used successful model.
const modelTally = new Map();
let totalCost = 0;
for (const r of results) {
if (!r.ok) continue;
modelTally.set(r.model, (modelTally.get(r.model) || 0) + 1);
const c = typeof r.cost?.totalCost === "string"
? parseFloat(r.cost.totalCost)
: r.cost?.totalCost || 0;
if (Number.isFinite(c)) totalCost += c;
}
const dominantModel = [...modelTally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null;
onProgress(
`Chunked analysis complete — ${results.length - failures.length}/${results.length} windows succeeded, ${stitched.length} topics`
);
return {
text: JSON.stringify({ sections: stitched }),
model: dominantModel,
cost: {
totalCost: totalCost.toFixed(6),
totalCostDisplay: totalCost < 0.01
? `$${(totalCost * 100).toFixed(3)}¢`
: `$${totalCost.toFixed(4)}`,
},
usage: null,
attempts: { windows: results.length, failed: failures.length },
};
}
async function runSingleShot({
entries,
analyzer,
fallbackModels,
onProgress,
signal,
jobId,
}) {
// Single-shot path: the whole transcript IS the "window". Compute
// totalAudioSec from the entries so the section-count target picker
// chooses the right bucket (<30 min → 6 sections, 30-60 → 8, etc.).
const totalAudioSec = entries.length > 0
? (entries[entries.length - 1].offset || 0) + (entries[entries.length - 1].duration || 0)
: 0;
const prompt = buildAnalysisPrompt(entries, { totalAudioSec });
let lastErr = null;
for (const tryModel of fallbackModels) {
try {
const result = await analyzer.analyzeText({
prompt,
model: tryModel,
onProgress,
signal,
jobId,
});
return result;
} catch (err) {
if (signal?.aborted) throw err;
lastErr = err;
}
}
throw lastErr || new Error("All analysis models failed");
}