Files
recap-relay/server/chunked-analyze.js

1257 lines
51 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Chunked topic-analysis for the relay's test-run worker. Mirrors the
// approach in `/Users/macpro/Projects/recap/server/chunked-analyze.js`
// (Recap app) but adapted for the relay:
//
// 1. Input is the relay's bracketed-text transcript (the same format
// that gets saved to the output store), not a structured `entries`
// array. A small parser converts the bracketed text back into
// { offset, text } entries before windowing.
//
// 2. Each window emits its OWN audit-log row (pipeline="analyze",
// with window_idx + window_count fields), so the Jobs table's
// "AN windows" column reflects real parallel window count and
// `analyze_ms` becomes the sum-of-windows (total backend work).
// Per-window `audio_seconds` = window BODY seconds so per-row
// rate stats (s/audio-min) divide by the right denominator.
//
// 3. Single backend, single model per call. The recap-app version
// walks a model-fallback chain per window; the relay only ever
// runs one model per benchmark permutation, so no fallback loop.
//
// 4. No abort signal / jobId threading — the relay's test-run worker
// manages cancellation at a higher level via job state.
//
// Tunables (window body, overlap, concurrency, cutoff) are passed in
// from the test-run worker, which sources them from the live-reloaded
// /data/config/relay-config.json. No hardcoded defaults here — the
// caller MUST supply explicit values, sourced from one canonical place
// (server/config.js defaultConfig()).
//
// Public entry point: `runChunkedAnalysis()` at the bottom.
import { recordCall } from "./audit-log.js";
import {
sortAndDedupeTranscript,
mergeShortEntries,
} from "./backends/gemini.js";
// ─────────────────────────────────────────────────────────────────────
// Bracketed-transcript → entries parser
// ─────────────────────────────────────────────────────────────────────
// Both the Gemini backend and the Hardware backend produce transcripts
// in the shape:
//
// [MM:SS] First caption line.
// [MM:SS] Next caption line.
// [H:MM:SS] (with hour for >60-min videos)
//
// We parse those bracketed timestamps back into entries so the
// windowing logic (which is offset-aware) can plan time-based windows.
// Lines without a leading bracket are appended to the previous entry's
// text (handles wrapped lines).
const TIMESTAMP_RE = /^\[(\d{1,2}):(\d{2})(?::(\d{2}))?\]\s*(.*)$/;
export function parseBracketedTranscript(text) {
if (!text || typeof text !== "string") return [];
const lines = text.split(/\r?\n/);
const entries = [];
for (const line of lines) {
const m = line.match(TIMESTAMP_RE);
if (m) {
let offset;
if (m[3] !== undefined) {
// [H:MM:SS] — h, mm, ss in groups 1/2/3
offset = parseInt(m[1], 10) * 3600 + parseInt(m[2], 10) * 60 + parseInt(m[3], 10);
} else {
// [MM:SS] — mm, ss in groups 1/2
offset = parseInt(m[1], 10) * 60 + parseInt(m[2], 10);
}
entries.push({ offset, text: (m[4] || "").trim() });
} else if (entries.length > 0) {
// Continuation line — append to the previous entry's text.
const last = entries[entries.length - 1];
const extra = line.trim();
if (extra) last.text = last.text ? last.text + " " + extra : extra;
}
// Lines before the first bracket (e.g. a header) are discarded.
}
return entries;
}
// Format seconds → "[H:MM:SS]" or "[MM:SS]" matching the relay's
// bracketed style.
function fmtTimestamp(secs) {
const s = Math.max(0, Math.floor(secs || 0));
const h = Math.floor(s / 3600);
const m = Math.floor((s % 3600) / 60);
const sec = s % 60;
const pad = (n) => n.toString().padStart(2, "0");
return h > 0 ? `${h}:${pad(m)}:${pad(sec)}` : `${m}:${pad(sec)}`;
}
// ─────────────────────────────────────────────────────────────────────
// Window planning — same algorithm as recap-app's planAnalysisWindows
// ─────────────────────────────────────────────────────────────────────
// Returns array of windows, each: { startIdx, endIdx, bodyStartIdx,
// startSec, bodyStartSec, bodyEndSec }. We carry the second-domain
// values forward (not just entry-index) so the audit row's per-window
// audio_seconds can be set to the body length exactly.
export function planAnalysisWindows(entries, { bodySeconds, overlapSeconds, cutoffSeconds }) {
if (!entries.length) return [];
const totalSec =
(entries[entries.length - 1].offset || 0) +
(entries[entries.length - 1].duration || 0);
if (totalSec <= cutoffSeconds) {
return [
{
startIdx: 0,
endIdx: entries.length - 1,
bodyStartIdx: 0,
startSec: 0,
bodyStartSec: 0,
bodyEndSec: totalSec,
},
];
}
const windows = [];
let bodyStartSec = 0;
while (bodyStartSec < totalSec) {
const bodyEndSec = Math.min(totalSec, bodyStartSec + bodySeconds);
const windowEndSec = bodyStartSec + bodySeconds + overlapSeconds;
const bodyStartIdx = firstEntryAtOrAfter(entries, bodyStartSec);
// No more entries — done.
if (bodyStartIdx >= entries.length) break;
// Gap handling: if the next entry past bodyStartSec sits beyond
// this window's end (e.g., a middle TX chunk was truncated and
// left a hole in the timeline), don't stop the loop — jump the
// body cursor forward to the next entry's body-stride boundary
// so the entries on the other side of the gap still get a window.
const nextEntryOffset = entries[bodyStartIdx].offset || 0;
if (nextEntryOffset >= windowEndSec) {
bodyStartSec = Math.max(
bodyStartSec + bodySeconds,
Math.floor(nextEntryOffset / bodySeconds) * bodySeconds
);
continue;
}
const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSeconds);
const startIdx = firstEntryAtOrAfter(entries, overlapWithPriorSec);
const endIdx = lastEntryBefore(entries, windowEndSec);
if (endIdx < bodyStartIdx) {
// Defensive — the gap check above should prevent this. If we
// still hit it, advance and continue rather than break.
bodyStartSec += bodySeconds;
continue;
}
windows.push({
startIdx,
endIdx,
bodyStartIdx,
startSec: overlapWithPriorSec,
bodyStartSec,
bodyEndSec,
});
if (endIdx >= entries.length - 1) break;
bodyStartSec += bodySeconds;
}
return windows;
}
export function firstEntryAtOrAfter(entries, sec) {
for (let i = 0; i < entries.length; i++) {
if ((entries[i].offset || 0) >= sec) return i;
}
return entries.length;
}
export function lastEntryBefore(entries, sec) {
let ans = -1;
for (let i = 0; i < entries.length; i++) {
if ((entries[i].offset || 0) < sec) ans = i;
else break;
}
return ans;
}
// Find the canonical entry index whose [offset, offset+duration]
// range contains the given time (in seconds). Falls back to the
// nearest entry by offset distance when no containing entry exists.
// Returns -1 only when `entries` is empty.
export function canonicalIndexForOffset(entries, sec) {
if (!entries.length) return -1;
// Exact-match-or-containing pass
for (let i = 0; i < entries.length; i++) {
const start = entries[i].offset || 0;
const dur = entries[i].duration || 0;
if (start <= sec && sec <= start + dur) return i;
if (start > sec) {
// Past it without a containing match — return the closer of
// entries[i-1] vs entries[i] by offset distance.
if (i === 0) return 0;
const prevDist = sec - (entries[i - 1].offset || 0);
const nextDist = (entries[i].offset || 0) - sec;
return prevDist <= nextDist ? i - 1 : i;
}
}
// Past the last entry — clamp to end.
return entries.length - 1;
}
// ─────────────────────────────────────────────────────────────────────
// Per-window prompt builder
// ─────────────────────────────────────────────────────────────────────
// Same shape as the test-run worker's original single-shot prompt,
// but emitted per-window slice. Includes the entry's offset prefix so
// the model can use timestamps when picking section boundaries.
// Default analyze-prompt template. Three template variables get
// interpolated at request time:
// {{windowMin}} — window length in minutes (derived from entries)
// {{targetSections}} — count target string ("1 section" / "1-2 sections" / ...)
// {{maxIndex}} — windowEntries.length - 1, used in the completeness
// constraint so the model is told the exact valid
// index range it must cover. NEW in this version
// alongside the numbered-line transcript format.
// {{transcript}} — the numbered + timestamped transcript text for the
// window, formatted as `[N] (MM:SS) text` per line
// (was `[MM:SS] text` before; switched to match the
// Recap-app's direct-to-Gemini path which produces
// much better section coverage because the model
// can read indices off the input rather than count
// bracketed lines internally).
// Operator-editable via the dashboard's Settings tab. The variable
// names MUST be preserved across edits; the validation in
// /admin/settings PUT checks that the prompt still contains
// {{transcript}} and the "JSON" output instruction.
export const DEFAULT_ANALYZE_PROMPT_TEMPLATE = `You are analyzing a ~{{windowMin}}-minute section of a longer transcript. Your job is to identify natural topic boundaries and group the transcript into discussion-based sections — aim for {{targetSections}}.
TRANSCRIPT (each line is numbered with a timestamp):
{{transcript}}
INSTRUCTIONS:
1. Read the entire transcript carefully.
2. Identify where the discussion naturally shifts from one topic to another.
3. Group consecutive transcript segments by topic. Some sections may be short (a quick aside) and some may be long (an extended deep-dive). Let the content dictate the length.
4. For each section, write:
- A short, specific topic title (3-8 words)
- A 1-3 sentence summary of what's discussed
- The start and end segment indices (inclusive), counted as the bracketed [N] number at the start of each transcript line above.
IMPORTANT:
- Sections must be chronological and non-overlapping.
- Every segment index from 0 to {{maxIndex}} must belong to exactly one section.
- startIndex of section N+1 must equal endIndex of section N plus 1.
- Create as many or as few sections as the content naturally requires — but lean toward broad, substantive topics rather than minute-by-minute breakdowns. A natural topic that spans several minutes of dialogue should be one section, not several.
- Titles should be descriptive and specific, not generic like "Introduction" unless it truly is one.
Respond with ONLY valid JSON in this exact format, no other text:
{
"sections": [
{
"title": "Brief Topic Title",
"summary": "1-3 sentence summary of this discussion section.",
"startIndex": 0,
"endIndex": 15
}
]
}`;
// Pick the target-total-sections value for a given total audio
// duration. The buckets match the Settings UI rows exactly so what
// the operator sees in their preview table is what the prompt
// receives at request time. Falls back to the next-larger bucket\'s
// default when a setting is missing/non-numeric.
function pickTargetTotalSections(totalAudioSec, totalsByBucket) {
const totalMin = (totalAudioSec || 0) / 60;
const pick = (k, fallback) => {
const v = Number(totalsByBucket?.[k]);
return Number.isFinite(v) && v > 0 ? v : fallback;
};
if (totalMin < 30) return pick("under_30", 6);
if (totalMin < 60) return pick("30_60", 8);
if (totalMin < 90) return pick("60_90", 9);
if (totalMin < 120) return pick("90_120", 10);
if (totalMin < 150) return pick("120_150", 11);
if (totalMin < 180) return pick("150_180", 12);
return pick("over_180", 12);
}
// Compute the per-window target sections from the total-target +
// window/audio geometry. Returns BOTH a numeric average (for the
// dashboard preview) and a free-form string to splice into the
// prompt. The string is intentionally round numbers / short ranges
// the model can interpret literally — fractional averages get
// expressed as "NM sections" so the model is given latitude rather
// than a confusing "2.5 sections" instruction.
//
// Edge cases:
// - Empty audio → returns 1 section (defensive).
// - Single-window job (windowSec >= totalAudioSec) → returns
// full target total (no division by less-than-one num-windows).
// - Fractional result (e.g. 2.5) → returns "23 sections".
// - Integer result (e.g. 3.0) → returns "around 3 sections".
export function computePerWindowTarget({ totalAudioSec, windowSec, totalsByBucket }) {
const target = pickTargetTotalSections(totalAudioSec, totalsByBucket);
if (!totalAudioSec || totalAudioSec <= 0) {
return { average: 1, label: "1 section", target_total: target };
}
// Effective number of windows the audio spans. Clamped to 1 so a
// single-shot run (audio <= analyze_cutoff) gets the full target.
const numWindows = Math.max(1, totalAudioSec / Math.max(60, windowSec || 60));
const avg = target / numWindows;
// Build the prompt-facing label.
let label;
if (avg <= 1.2) {
label = "1 section";
} else {
const lo = Math.max(1, Math.floor(avg));
const hi = Math.max(lo, Math.ceil(avg));
if (lo === hi) {
label = "around " + lo + " sections";
} else {
label = lo + "" + hi + " sections";
}
}
return { average: avg, label, target_total: target };
}
function buildWindowPrompt(windowEntries, promptOverride, targetTotalsByBucket, totalAudioSec) {
// Numbered + timestamped transcript format. Each line is
// [N] (MM:SS) text
// where N is the entry's position within THIS window (not its
// global position in the full transcript — the stitcher
// translates window-local indices to global at result time).
//
// Why numbered: the model is asked to emit startIndex/endIndex
// for each section. If indices aren't shown in the input, the
// model has to count bracketed-timestamp lines and frequently
// hallucinates out-of-range values (e.g., emitting endIndex=9999
// for a 200-entry window). Showing indices makes the contract
// explicit and dramatically reduces hallucination.
const transcriptText = windowEntries
.map((e, i) => `[${i}] (${fmtTimestamp(e.offset)}) ${e.text}`)
.join("\n");
// Window length in seconds (last entry's offset minus first's),
// used to scale the suggested section count. The actual per-window
// section target also depends on the TOTAL video duration, not
// just this window's slice — see computePerWindowTarget().
const windowSec = windowEntries.length > 1
? (windowEntries[windowEntries.length - 1].offset || 0) - (windowEntries[0].offset || 0)
: 0;
const windowMin = Math.max(1, Math.round(windowSec / 60));
// Max valid index = last entry's position in this window's slice.
// Substituted into the prompt's completeness clause so the model
// is told the exact range it must cover (0 through maxIndex
// inclusive).
const maxIndex = Math.max(0, windowEntries.length - 1);
// Section-count target: the new model is total-sections-per-video
// bucketed by TOTAL audio duration. The relay divides by the
// effective number of windows ((totalAudioSec / windowSec), clamped
// to >=1) to produce the per-window average, then formats it as a
// human-readable string for {{targetSections}}.
//
// `totalAudioSec` is passed in from the caller (runChunkedAnalysis
// receives it from summarize-url.js' worker). When unavailable
// (e.g. legacy callers), computePerWindowTarget falls back to "1
// section" — defensive, won't crash but won't be useful either.
const targetCalc = computePerWindowTarget({
totalAudioSec,
windowSec,
totalsByBucket: targetTotalsByBucket || {},
});
const targetSections = targetCalc.label;
const template = (typeof promptOverride === "string" && promptOverride.trim())
? promptOverride
: DEFAULT_ANALYZE_PROMPT_TEMPLATE;
// Simple {{var}} interpolation. We DON'T use a templating library
// to keep this dependency-free and predictable. Variables are
// replaced literally; unknown {{var}} tokens pass through as-is
// (helpful if the operator typos a variable — they'll see it in
// the model's output and know to fix it).
return template
.replaceAll("{{windowMin}}", String(windowMin))
.replaceAll("{{targetSections}}", targetSections)
.replaceAll("{{maxIndex}}", String(maxIndex))
.replaceAll("{{transcript}}", transcriptText);
}
// Strip code fences + parse a JSON-formatted analyze response into
// `{ sections: [...] }`. Returns null on parse failure.
function safeParseSections(text) {
if (!text || typeof text !== "string") return null;
let jsonStr = text.trim();
const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/);
if (cb) jsonStr = cb[1].trim();
try {
const parsed = JSON.parse(jsonStr);
return parsed && Array.isArray(parsed.sections) ? parsed : null;
} catch {
return null;
}
}
// ─────────────────────────────────────────────────────────────────────
// Stitcher
// ─────────────────────────────────────────────────────────────────────
// Combines per-window section lists into a single ordered list of
// non-overlapping sections referencing entries by their position in
// the FULL entries array. Same algorithm as recap-app's stitcher:
// each window N owns sections whose globalStart falls before
// window(N+1).bodyStartIdx. Last window has no successor → keep all.
export function stitchAnalysisResults(results) {
const out = [];
for (let i = 0; i < results.length; i++) {
const r = results[i];
if (!r || !r.ok) continue;
const next = results[i + 1];
const nextBody =
next && next.window ? next.window.bodyStartIdx : Infinity;
const offset = r.window.startIdx;
// Cap each section's endIndex to this window's own range. Without
// this clamp, a model that emits a section with an out-of-range
// endIndex (e.g., endIndex: 9999 — happens when the LLM
// hallucinates a value past the window it was given) ends up
// with a globalEnd extending way past the window's actual end.
// Symptom: an analyze window covers 18 minutes of transcript but
// produces a section spanning 50+ minutes in the final output,
// because the inflated endIndex propagates all the way to the
// UI. The clamp uses r.window.endIdx, which is the last entry
// index this window's transcript actually contains.
const windowMaxEndIdx = r.window.endIdx;
for (const s of r.sections) {
const globalStart = offset + (s.startIndex ?? 0);
const rawGlobalEnd = offset + (s.endIndex ?? 0);
const globalEnd = Math.min(rawGlobalEnd, windowMaxEndIdx);
if (globalStart >= nextBody) continue;
// Skip sections that the clamp pushed below startIndex —
// these were degenerate to begin with (the model emitted
// start > end) and the clamp can't recover them.
if (globalEnd < globalStart) continue;
out.push({
startIndex: globalStart,
endIndex: globalEnd,
title: s.title,
summary: s.summary,
});
}
}
// Dedup with proper containment handling. The earlier implementation
// (sort + trim-on-next-start) handled simple partial overlaps but
// silently let TWO sections survive when one fully contained the
// other (e.g., model emits both "Systemic Critique 1:10-1:23" AND
// "Decentralizing 1:10-1:12" in the same window output). Symptom:
// the UI shows two topics starting at the same timestamp with one's
// range fully inside the other's. Algorithm:
// 1. Sort by startIndex ASC. Tiebreak by endIndex ASC so the
// SMALLER (more specific) section comes first when two share
// a start — the cross-section trim below then drops the
// small one into a degenerate range, and we filter it out.
// 2. Walk left-to-right, tracking the largest endIndex seen so
// far ("maxEndSeen"). A section whose endIndex is already
// covered (<= maxEndSeen) is FULLY contained in something we
// already accepted — drop it.
// 3. For partial overlap (current.startIndex <= previous.endIndex),
// trim the previous section's endIndex back to current.startIndex - 1.
// If that makes the previous degenerate, pop it.
out.sort(
(a, b) => a.startIndex - b.startIndex || a.endIndex - b.endIndex
);
const deduped = [];
let maxEndSeen = -1;
for (const s of out) {
if (s.endIndex < s.startIndex) continue;
// Fully contained in something already accepted — drop.
if (s.endIndex <= maxEndSeen) continue;
// Partial overlap with the prior accepted section — trim the
// prior to end one entry before this one starts.
const prev = deduped[deduped.length - 1];
if (prev && prev.endIndex >= s.startIndex) {
prev.endIndex = s.startIndex - 1;
if (prev.endIndex < prev.startIndex) {
deduped.pop();
}
}
deduped.push(s);
if (s.endIndex > maxEndSeen) maxEndSeen = s.endIndex;
}
return deduped;
}
// ─────────────────────────────────────────────────────────────────────
// Public entry point
// ─────────────────────────────────────────────────────────────────────
// Runs chunked analysis end-to-end:
//
// transcriptText (bracketed [MM:SS] format)
// → entries[]
// → windows[]
// → analyze each window in parallel (bounded concurrency)
// → audit per-window
// → stitch sections into final list
//
// Each window emits an audit row with pipeline="analyze",
// audio_seconds=windowBodySeconds, window_idx=N, window_count=K,
// duration_ms=this window's analyze wall time, status="success"|"error".
// On error, the per-window row records the error message AND the
// overall run continues — failed windows are dropped from stitching
// (with a warning) rather than aborting the whole job.
//
// Returns:
// {
// text: "<JSON string with .sections>", // for downstream parity
// model: dominantModelName,
// usage: null,
// attempts: { windows: N, failed: K },
// }
// or throws when ALL windows failed.
export async function runChunkedAnalysis({
transcriptText,
backend,
// Audit-row metadata (passed through to each per-window recordCall):
pipelineBackend, // "gemini" | "hardware"
jobId,
batchId,
mediaUrl,
title,
installId,
// Audit-only — paired with install_id on every per-window recordCall
// so license-pool aggregations on the dashboard see this work. Pure
// forensic field; credits.js isn't called from here.
licenseFingerprint = null,
source,
// Cost helper — caller passes a function (model, usage) → costDetails
// because gemini.js's cost calculator is gemini-specific. Returns
// { input_tokens, output_tokens, thinking_tokens, cost_usd }; hardware
// path passes a no-op helper that returns zeros.
computeCostDetails,
// Tunables (all required, no hardcoded defaults here):
bodySeconds,
overlapSeconds,
concurrency,
cutoffSeconds,
// Operator-editable analyze prompt template (Settings tab).
// Empty/missing falls back to DEFAULT_ANALYZE_PROMPT_TEMPLATE.
// Same applies to both Gemini and operator-hardware analyze paths.
analyzePromptOverride = "",
// Operator-editable section-count target totals per VIDEO duration
// bucket. Shape: { under_30, "30_60", "60_90", "90_120", "120_150",
// "150_180", over_180 } — each is a NUMBER (target total sections
// for a video in that duration bucket). The relay computes a
// per-window average by dividing by num_windows and formats it
// into {{targetSections}}. See computePerWindowTarget() in this
// file for the math + label formatting. Missing/non-numeric
// buckets fall back to hardcoded defaults (6/8/9/10/11/12/12).
targetTotalsByBucket = null,
// Total audio duration in seconds — required for the per-window
// target math. Pulled by the worker from getAudioDurationSeconds()
// and threaded through. If omitted (legacy callers), the prompt
// emits "1 section" as a defensive fallback.
totalAudioSec = 0,
// Optional: called once per window AS SOON AS its sections are
// available (out of order, since windows resolve in parallel). The
// callback receives the window's BODY-OWNED sections — the ones
// the final stitcher will keep. Each section carries GLOBAL indices
// (into the full entries array) so the caller can render them in
// place without further translation. Used by /relay/summarize-url
// to push window_complete SSE events to the connected Recap client
// as windows arrive, so the user sees topics appearing in real time
// instead of waiting for the whole pipeline to finish.
onWindowComplete = null,
}) {
const entries = parseBracketedTranscript(transcriptText);
if (entries.length === 0) {
// Edge case: empty transcript. Record one failed analyze row so
// the Jobs table shows what happened, then throw.
await recordCall({
install_id: installId,
license_fingerprint: licenseFingerprint,
tier: "core",
pipeline: "analyze",
backend: pipelineBackend,
model: null,
status: "error",
duration_ms: 0,
audio_seconds: 0,
cost_usd: 0,
job_id: jobId,
batch_id: batchId,
source,
media_url: mediaUrl,
title: title || null,
error: "transcript empty — nothing to analyze",
window_idx: 0,
window_count: 1,
});
throw new Error("transcript empty — nothing to analyze");
}
const windows = planAnalysisWindows(entries, {
bodySeconds,
overlapSeconds,
cutoffSeconds,
});
if (windows.length === 0) {
throw new Error("planAnalysisWindows produced no windows (unexpected)");
}
// Each window's audio_seconds = its BODY duration. This matches what
// the per-row rate stats SHOULD divide by, since a window analyzes a
// body+overlap span but only the body contributes to the stitched
// output. Last window's body is clamped to the transcript end.
const results = new Array(windows.length);
let nextIdx = 0;
async function worker() {
while (true) {
const my = nextIdx++;
if (my >= windows.length) return;
const w = windows[my];
const windowEntries = entries.slice(w.startIdx, w.endIdx + 1);
const prompt = buildWindowPrompt(
windowEntries,
analyzePromptOverride,
targetTotalsByBucket,
totalAudioSec
);
const bodySec = Math.max(0, w.bodyEndSec - w.bodyStartSec);
const winStart = Date.now();
try {
// Per-window analyze with up to 3 attempts on invalid-JSON /
// exception. Analyze is by far the cheapest pipeline phase
// (a few seconds per window vs 30+ seconds for transcribe),
// so being aggressive about retries is essentially free —
// worst-case wall time for a failing window goes from ~5s to
// ~15s, which is still trivial compared to the 30+ minute
// transcribe phase for long content.
//
// The relay also now passes `responseMimeType:
// "application/json"` on the analyze call (see gemini.js
// analyzeText), so Gemini decoder-enforces valid JSON output
// — invalid-JSON failures from prose preamble / markdown
// fence wrapping / truncated brace should be eliminated
// entirely on Gemini. The retry loop here is now mostly
// defense for the hardware (Gemma) path which has no
// equivalent decoder-side guarantee, plus capacity blips.
const MAX_ATTEMPTS = 3;
let r = null;
let parsed = null;
let lastAttemptErr = null;
for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
r = await backend.analyzeText({ prompt });
parsed = safeParseSections(r.text);
if (parsed) {
lastAttemptErr = null;
break;
}
lastAttemptErr = "invalid JSON in window response";
} catch (innerErr) {
r = null;
parsed = null;
lastAttemptErr = (innerErr?.message || String(innerErr)).slice(0, 280);
}
if (attempt < MAX_ATTEMPTS - 1) {
console.warn(
`[chunked-analyze] window ${my + 1}/${windows.length} attempt ${attempt + 1} failed (${lastAttemptErr}) — retrying`
);
}
}
const dur = Date.now() - winStart;
if (!parsed) {
// Both attempts failed — record one audit row, drop from
// stitching, continue the run. The stitcher will skip this
// window's body region; the summarize-url credit gate will
// refuse to charge because anyWindowFailed becomes true.
results[my] = { window: w, ok: false, error: new Error(lastAttemptErr || "analyze failed") };
await recordCall({
install_id: installId,
license_fingerprint: licenseFingerprint,
tier: "core",
pipeline: "analyze",
backend: pipelineBackend,
model: r?.model || null,
status: "error",
duration_ms: dur,
audio_seconds: bodySec,
cost_usd: 0,
job_id: jobId,
batch_id: batchId,
source,
media_url: mediaUrl,
title: title || null,
error: (lastAttemptErr || "analyze failed") + " (after " + MAX_ATTEMPTS + " attempts)",
window_idx: my,
window_count: windows.length,
window_body_seconds: bodySec,
});
continue;
}
const costDetails = computeCostDetails(r.model, r.usage);
results[my] = {
window: w,
ok: true,
sections: parsed.sections,
model: r.model,
};
await recordCall({
install_id: installId,
license_fingerprint: licenseFingerprint,
tier: "core",
pipeline: "analyze",
backend: pipelineBackend,
model: r.model || null,
status: "success",
duration_ms: dur,
audio_seconds: bodySec,
job_id: jobId,
batch_id: batchId,
source,
media_url: mediaUrl,
title: title || null,
window_idx: my,
window_count: windows.length,
window_body_seconds: bodySec,
...costDetails,
});
// Streaming callback: emit this window's body-owned sections
// as soon as they arrive. Uses the same ownership rule as the
// final stitcher (window N owns sections starting before
// window N+1's body), but computed locally with just this
// window's planned position — the next window's bodyStartIdx
// is known from the plan even if that window hasn't finished.
if (onWindowComplete) {
const next = windows[my + 1];
const nextBodyIdx = next ? next.bodyStartIdx : Infinity;
const offset = w.startIdx;
// Same endIndex clamp as stitchAnalysisResults — a model
// that emits an out-of-range endIndex would otherwise
// produce a streaming partial-section spanning way past
// this window's transcript range. The final stitch
// re-clamps, but if the SSE client renders partials
// incrementally it'd briefly show the inflated bounds.
const windowMaxEndIdx = w.endIdx;
const owned = [];
for (const s of parsed.sections) {
const globalStart = offset + (s.startIndex ?? 0);
const rawGlobalEnd = offset + (s.endIndex ?? 0);
const globalEnd = Math.min(rawGlobalEnd, windowMaxEndIdx);
if (globalStart >= nextBodyIdx) continue;
if (globalEnd < globalStart) continue;
owned.push({
startIndex: globalStart,
endIndex: globalEnd,
title: s.title,
summary: s.summary,
});
}
try {
onWindowComplete({
windowIdx: my,
totalWindows: windows.length,
ownedSections: owned,
windowBodySeconds: bodySec,
model: r.model || null,
durationMs: dur,
});
} catch (cbErr) {
// Streaming is best-effort — don't fail the analyze loop
// if a callback throws (e.g. the SSE client disconnected
// mid-window).
console.warn(
`[chunked-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}`
);
}
}
} catch (err) {
const dur = Date.now() - winStart;
results[my] = { window: w, ok: false, error: err };
await recordCall({
install_id: installId,
license_fingerprint: licenseFingerprint,
tier: "core",
pipeline: "analyze",
backend: pipelineBackend,
model: null,
status: "error",
duration_ms: dur,
audio_seconds: bodySec,
cost_usd: 0,
job_id: jobId,
batch_id: batchId,
source,
media_url: mediaUrl,
title: title || null,
error: (err?.message || String(err)).slice(0, 400),
window_idx: my,
window_count: windows.length,
window_body_seconds: bodySec,
});
}
}
}
const workerPromises = Array.from(
{ length: Math.min(concurrency, windows.length) },
worker
);
await Promise.all(workerPromises);
const succeeded = results.filter((r) => r && r.ok);
const failed = results.filter((r) => r && !r.ok);
if (succeeded.length === 0) {
throw new Error(
`All ${results.length} analyze windows failed. First error: ${
failed[0]?.error?.message || "unknown"
}`
);
}
const stitched = stitchAnalysisResults(results);
// Aggregate model attribution: dominant successful model.
const modelTally = new Map();
for (const r of results) {
if (!r.ok || !r.model) continue;
modelTally.set(r.model, (modelTally.get(r.model) || 0) + 1);
}
const dominantModel =
[...modelTally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null;
return {
text: JSON.stringify({ sections: stitched }),
model: dominantModel,
usage: null,
attempts: { windows: results.length, failed: failed.length },
};
}
// ─────────────────────────────────────────────────────────────────────
// Pipelined analyze (Phase 2 of the streaming UX work).
//
// Lets analyze windows fire AS SOON AS their required transcribe
// chunks have completed, in parallel with later chunks still being
// transcribed. The wall-time savings are modest for short content
// where transcribe dwarfs analyze (a 94-min video here: ~12s of
// analyze that can hide under transcribe), but the user-perceived
// improvement is dramatic — first topics render at T=~80s instead
// of T=~160s, because they don't have to wait for transcribe of
// minute 80-94 to start drawing the first window's topics.
//
// Design:
// 1. Caller pre-plans windows by AUDIO TIME (not entries — entries
// don't exist yet).
// 2. Caller provides a `getReadyText(startSec, endSec) → string`
// function that returns the current transcribe output covering
// that time range (bracketed MM:SS lines). It throws or returns
// null if not enough chunks have completed yet.
// 3. Caller provides a `waitForTime(targetEndSec) → Promise<void>`
// that resolves when transcribe has covered up to targetEndSec.
// 4. Each window's worker awaits its required time then calls
// analyzeOneWindow with the window's text. Workers run as a
// bounded concurrent pool same as runChunkedAnalysis.
// 5. Section indices that come back from each window are LOCAL to
// that window's entry slice. The caller is responsible for
// mapping them to GLOBAL entry indices after transcribe fully
// completes (when the canonical entries array exists).
// ─────────────────────────────────────────────────────────────────────
// Pre-plan analyze windows from total audio duration alone. Same
// math as planAnalysisWindows but returns ONLY time fields (no
// entry indices) — entries aren't built yet during pipelining. The
// caller assembles entries per-window when firing.
export function planWindowsByDuration({
totalAudioSec,
bodySeconds,
overlapSeconds,
cutoffSeconds,
}) {
if (!totalAudioSec || totalAudioSec <= 0) return [];
// Single-shot fast path for short audio — same threshold as
// planAnalysisWindows.
if (totalAudioSec <= cutoffSeconds) {
return [
{
idx: 0,
startSec: 0,
bodyStartSec: 0,
bodyEndSec: totalAudioSec,
windowEndSec: totalAudioSec,
},
];
}
const windows = [];
let bodyStartSec = 0;
let idx = 0;
while (bodyStartSec < totalAudioSec) {
const bodyEndSec = Math.min(totalAudioSec, bodyStartSec + bodySeconds);
const windowEndSec = Math.min(
totalAudioSec,
bodyStartSec + bodySeconds + overlapSeconds
);
const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSeconds);
windows.push({
idx,
startSec: overlapWithPriorSec,
bodyStartSec,
bodyEndSec,
windowEndSec,
});
idx += 1;
if (bodyEndSec >= totalAudioSec) break;
bodyStartSec += bodySeconds;
}
return windows;
}
// Single-window analyze with the same retry + audit semantics
// runChunkedAnalysis uses internally. Extracted so the pipelined
// path can call it per-window without duplicating the logic.
//
// Returns: { ok: true, sections, model } or { ok: false, error }.
// Audit row is recorded inside.
export async function analyzeOneWindow({
windowEntries,
windowIdx,
windowCount,
windowBodySec,
windowStartSec,
windowEndSec,
backend,
// Audit / accounting params:
pipelineBackend,
jobId,
batchId,
mediaUrl,
title,
installId,
licenseFingerprint = null,
source,
computeCostDetails,
// Prompt params:
analyzePromptOverride = "",
targetTotalsByBucket = null,
totalAudioSec = 0,
}) {
if (!windowEntries || windowEntries.length === 0) {
const err = new Error(
`analyzeOneWindow window ${windowIdx + 1} has no entries — audio gap or pre-cutoff window`
);
return { ok: false, error: err };
}
const prompt = buildWindowPrompt(
windowEntries,
analyzePromptOverride,
targetTotalsByBucket,
totalAudioSec
);
const winStart = Date.now();
const MAX_ATTEMPTS = 3;
let r = null;
let parsed = null;
let lastAttemptErr = null;
for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
r = await backend.analyzeText({ prompt });
parsed = safeParseSections(r.text);
if (parsed) {
lastAttemptErr = null;
break;
}
lastAttemptErr = "invalid JSON in window response";
} catch (innerErr) {
r = null;
parsed = null;
lastAttemptErr = (innerErr?.message || String(innerErr)).slice(0, 280);
}
if (attempt < MAX_ATTEMPTS - 1) {
console.warn(
`[chunked-analyze] window ${windowIdx + 1}/${windowCount} attempt ${attempt + 1} failed (${lastAttemptErr}) — retrying`
);
}
}
const dur = Date.now() - winStart;
const auditCommon = {
install_id: installId,
license_fingerprint: licenseFingerprint,
tier: "core",
pipeline: "analyze",
backend: pipelineBackend,
duration_ms: dur,
audio_seconds: windowBodySec,
job_id: jobId,
batch_id: batchId,
source,
media_url: mediaUrl,
title: title || null,
window_idx: windowIdx,
window_count: windowCount,
window_body_seconds: windowBodySec,
};
if (!parsed) {
await recordCall({
...auditCommon,
model: r?.model || null,
status: "error",
cost_usd: 0,
error: (lastAttemptErr || "analyze failed") + " (after " + MAX_ATTEMPTS + " attempts)",
});
return { ok: false, error: new Error(lastAttemptErr || "analyze failed") };
}
const costDetails = computeCostDetails(r.model, r.usage);
await recordCall({
...auditCommon,
model: r.model || null,
status: "success",
input_tokens: costDetails.input_tokens || 0,
output_tokens: costDetails.output_tokens || 0,
thinking_tokens: costDetails.thinking_tokens || 0,
cost_usd: costDetails.cost_usd || 0,
});
return {
ok: true,
sections: parsed.sections,
model: r.model,
durationMs: dur,
};
}
// Run analyze in pipelined mode, fired window-by-window as transcribe
// chunks become available. See block comment above for the design.
//
// Inputs:
// audioDurationSec — total audio seconds (used to pre-plan windows)
// waitForTime(sec) — async; resolves when transcribe has covered
// the audio up through `sec`
// getReadySegments(startSec, endSec) — returns the deduped segments
// currently buffered for that time range (each
// { start, end, text } with global timestamps).
// Called per window AFTER waitForTime resolves.
// concurrency — max simultaneous analyze windows
// onWindowComplete — fired per window (same shape as
// runChunkedAnalysis's callback) with WINDOW-
// LOCAL section indices. Caller maps to global
// indices at end of pipeline.
// ... — all the audit + tunable params analyzeOneWindow
// needs
//
// Returns:
// {
// windowResults: [{ window, ok, sections, model, windowEntries }],
// dominantModel,
// attempts: { windows, failed },
// }
// where each windowEntries is the entries slice that window analyzed
// (for the caller's later index-remapping).
export async function runPipelinedAnalysis({
audioDurationSec,
waitForTime,
getReadySegments,
bodySeconds,
overlapSeconds,
cutoffSeconds,
concurrency,
onWindowComplete,
// Pass-through to analyzeOneWindow:
backend,
pipelineBackend,
jobId,
batchId,
mediaUrl,
title,
installId,
licenseFingerprint = null,
source,
computeCostDetails,
analyzePromptOverride = "",
targetTotalsByBucket = null,
}) {
const windows = planWindowsByDuration({
totalAudioSec: audioDurationSec,
bodySeconds,
overlapSeconds,
cutoffSeconds,
});
if (windows.length === 0) {
throw new Error(
"planWindowsByDuration produced no windows — audioDurationSec missing or zero"
);
}
const results = new Array(windows.length);
let nextIdx = 0;
async function worker() {
while (true) {
const my = nextIdx++;
if (my >= windows.length) return;
const w = windows[my];
// Wait until transcribe has covered this window's full time
// range (including the overlap tail).
await waitForTime(w.windowEndSec);
// Extract segments in [w.startSec, w.windowEndSec). The caller
// is responsible for deduping at addChunk time so we don't
// see duplicates here.
const segments = getReadySegments(w.startSec, w.windowEndSec);
// Two-pass cleanup matching what hardware.js applies to the
// FULL stitched transcript: sortAndDedupeTranscript +
// mergeShortEntries collapses adjacent short Parakeet
// segments into thought-sized lines.
const rawBracketed = segmentsToBracketedText(segments);
const mergedBracketed = mergeShortEntries(
sortAndDedupeTranscript(rawBracketed)
);
const windowEntries = parseBracketedTranscript(mergedBracketed);
// Diagnostic so we can see per-window readiness in operator
// logs WITHOUT depending on audit-row inspection. Reveals
// empty-window or huge-entry-count issues that cause silent
// analyzeOneWindow failures.
console.log(
`[pipelined-analyze] window ${my + 1}/${windows.length}: ${segments.length} segments → ${windowEntries.length} merged entries · time [${w.startSec}s-${w.windowEndSec}s]`
);
const bodySec = Math.max(0, w.bodyEndSec - w.bodyStartSec);
const winResult = await analyzeOneWindow({
windowEntries,
windowIdx: my,
windowCount: windows.length,
windowBodySec: bodySec,
windowStartSec: w.startSec,
windowEndSec: w.windowEndSec,
backend,
pipelineBackend,
jobId,
batchId,
mediaUrl,
title,
installId,
licenseFingerprint,
source,
computeCostDetails,
analyzePromptOverride,
targetTotalsByBucket,
totalAudioSec: audioDurationSec,
});
results[my] = { window: w, windowEntries, ...winResult };
// Per-window outcome log so operator sees what actually
// happened beyond "X/Y windows failed" rollup. Failure log
// includes the underlying error so we can diagnose JSON
// validation, model 4xx, network, etc. without grepping
// the audit DB.
if (winResult.ok) {
const dur = ((winResult.durationMs || 0) / 1000).toFixed(1);
console.log(
`[pipelined-analyze] window ${my + 1}/${windows.length} ok in ${dur}s — ${(winResult.sections || []).length} sections`
);
} else {
const errMsg = winResult.error?.message || String(winResult.error || "unknown");
console.warn(
`[pipelined-analyze] window ${my + 1}/${windows.length} FAILED — ${errMsg.slice(0, 280)}`
);
}
// Emit per-window streaming event with WINDOW-LOCAL indices.
// The caller (summarize-url) will re-emit as a global-indexed
// event after the post-transcribe canonical-entries build.
//
// OWNED-SECTION FILTERING: each window's transcript range
// [startSec, windowEndSec] OVERLAPS BOTH the prior AND the
// next window. Without symmetric filtering, sections in the
// prior-overlap region (this window's view picked them up
// even though the prior window owns that body region) emit
// alongside the prior window's version → overlapping
// timestamps in the streaming UI. Symmetric filter: emit
// ONLY sections whose start time falls in this window's
// EXCLUSIVE body region [bodyStartSec, nextBodyStartSec).
// First window: bodyStartSec=0 → no lower bound.
// Last window: no next → no upper bound. The final stitcher
// still sees the FULL unfiltered section set (results[my]
// keeps everything), so any topic that crosses a body
// boundary and is missed by both adjacent windows in their
// body region still surfaces at result-event time.
const nextWindow = windows[my + 1];
const ownedFloorSec = w.bodyStartSec || 0;
const ownedCutoffSec = nextWindow
? nextWindow.bodyStartSec
: Infinity;
const ownedSections = winResult.ok
? (winResult.sections || []).filter((s) => {
const startEntry = windowEntries[s.startIndex];
if (!startEntry) return false;
const startSec = startEntry.offset || 0;
return startSec >= ownedFloorSec && startSec < ownedCutoffSec;
})
: [];
if (onWindowComplete && winResult.ok) {
try {
onWindowComplete({
windowIdx: my,
totalWindows: windows.length,
window: w,
windowEntries,
ownedSections,
windowBodySeconds: bodySec,
model: winResult.model || null,
durationMs: winResult.durationMs || 0,
});
} catch (cbErr) {
console.warn(
`[pipelined-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}`
);
}
}
}
}
const workers = Array.from(
{ length: Math.min(concurrency, windows.length) },
worker
);
await Promise.all(workers);
const failed = results.filter((r) => r && !r.ok);
const dominantModel = (() => {
const tally = new Map();
for (const r of results) {
if (!r || !r.ok || !r.model) continue;
tally.set(r.model, (tally.get(r.model) || 0) + 1);
}
return [...tally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null;
})();
return {
windowResults: results,
dominantModel,
attempts: { windows: results.length, failed: failed.length },
};
}
// Helper: convert an array of { start, end, text } segments into
// the bracketed [MM:SS] text format the analyze prompt + parser
// expect. Identical formatting to what hardware.js emits as its
// stitched transcript text.
function segmentsToBracketedText(segments) {
if (!segments || segments.length === 0) return "";
// Sort by start in case caller passed unsorted (defensive).
const sorted = segments.slice().sort((a, b) => (a.start || 0) - (b.start || 0));
return sorted
.map((s) => {
// Use H:MM:SS when the segment's time is in the 2nd hour or
// beyond. Previous code unconditionally used [MM:SS] which
// produced [106:30] for a 1:46:30 timestamp — 3-digit minutes
// are REJECTED by parseBracketedTranscript's regex
// (/^\[(\d{1,2}):(\d{2}).../), causing every transcript line
// past minute 100 to be silently dropped. Symptom on a 2h53m
// video: analyze windows 7-10 (covering 106+ min) all received
// 0 entries after the merge step and failed with "no entries —
// audio gap or pre-cutoff window". The H:MM:SS format matches
// what mergeShortEntries already emits and what the parser's
// optional 3rd capture group accepts.
const sec = Math.floor(s.start || 0);
const h = Math.floor(sec / 3600);
const m = Math.floor((sec % 3600) / 60);
const ss = sec % 60;
const pad = (n) => n.toString().padStart(2, "0");
const stamp = h > 0 ? `${h}:${pad(m)}:${pad(ss)}` : `${m}:${pad(ss)}`;
return `[${stamp}] ${(s.text || "").trim()}`;
})
.filter((l) => l.length > 0)
.join("\n");
}