// Chunk-buffer state used by the pipelined-analyze path in // routes/summarize-url.js. The hardware backend fires // onChunkComplete(chunkData) as each transcribe chunk finishes; // this buffer: // - drains chunks in INDEX ORDER (chunks may arrive out of order // when concurrency > 1; we hold them in `pending` until the // next-expected index lands so dedup against the prior chunk's // overlap boundary is deterministic) // - dedupes each new chunk's segments against the prior chunk's // overlapBoundarySec — same logic that runs at end-of-transcribe // in hardware.js, but applied incrementally so analyze can read // a clean, no-duplicates segment view per window // - tracks coveredEndSec (the maximum global timestamp the deduped // buffer extends to, considering ONLY in-order chunks) // - lets the analyze workers await `waitForTime(targetSec)` and // query `getSegments(startSec, endSec)` to build per-window // analyze inputs as soon as the required chunks are in // // Failure modes: // - A chunk fails entirely → its segments are empty / undefined. // The buffer still advances nextExpected past it so later chunks // aren't stuck behind. The window covering that chunk's range // gets a shorter transcript and may yield no sections (or fewer // than expected). Downstream stitcher tolerates gaps. // - waitForTime can wait forever if the relevant chunk index // never arrives. Caller is responsible for racing this against // the transcribe Promise so a transcribe failure unblocks all // pending waiters via reject. export function createChunkBuffer() { return { // Sparse staging area for chunks that arrived out of index order. pending: new Map(), // Drained, deduped, sorted-by-start segments. Append-only. segments: [], // Index of the next chunk we're waiting on to drain. nextExpected: 0, // Total chunk count, populated on the first onChunkComplete call. totalChunks: null, // Greatest global end-time covered by drained chunks. NOT just // max(pending) — out-of-order pending chunks don't count until // their predecessors land, so dedup is consistent. coveredEndSec: 0, // The previous chunk's overlap boundary in GLOBAL seconds. // Segments in the next chunk with start < this are duplicates of // segments already in the prior chunk's tail and get dropped. prevOverlapBoundary: 0, // Async waiters: { targetSec, resolve, reject } waiters: [], // Set true on terminal failure so future waiters reject immediately // instead of hanging. failed: false, failedReason: null, add(chunkData) { if (this.failed) return; if (chunkData == null) return; if (this.totalChunks == null && Number.isInteger(chunkData.totalChunks)) { this.totalChunks = chunkData.totalChunks; } this.pending.set(chunkData.chunkIndex, chunkData); // Drain consecutive chunks starting from nextExpected while (this.pending.has(this.nextExpected)) { const c = this.pending.get(this.nextExpected); this.pending.delete(this.nextExpected); const segs = Array.isArray(c.segments) ? c.segments : []; // Dedup against the global overlap boundary set by the prior // chunk. Same predicate hardware.js uses at end-of-transcribe // for the global stitch: `seg.start >= prevOverlapBoundary`. for (const s of segs) { if ((s.start || 0) >= this.prevOverlapBoundary) { this.segments.push(s); } } // overlapBoundarySec from audio-meta.js is ALREADY a global // timestamp (= startSec + overlapSeconds at chunking time), // NOT a chunk-relative offset. The earlier `c.startSeconds + // c.overlapBoundarySec` double-counted: chunk 1 ended up // with prevOverlapBoundary=570 instead of 300, chunk 2 // 1110 instead of 570, and by chunk 3+ the boundary had // outrun every subsequent chunk's segments — all dropped. // Symptom: window 1 received only ~30% of the segments it // should have, windows 2-6 received zero. Matches the // formula hardware.js uses at end-of-transcribe (with // offsetSeconds=0 for summarize-url callers). this.prevOverlapBoundary = c.overlapBoundarySec || 0; const endHere = (c.startSeconds || 0) + (c.durationSeconds || 0); if (endHere > this.coveredEndSec) this.coveredEndSec = endHere; this.nextExpected += 1; } this.checkWaiters(); }, checkWaiters() { const stillWaiting = []; for (const w of this.waiters) { if (this.coveredEndSec >= w.targetSec) { w.resolve(); } else { stillWaiting.push(w); } } this.waiters = stillWaiting; }, // Block until coveredEndSec reaches targetSec. Rejects with the // failedReason if the buffer is poisoned by a transcribe failure. waitForTime(targetSec) { if (this.failed) return Promise.reject(this.failedReason); if (this.coveredEndSec >= targetSec) return Promise.resolve(); return new Promise((resolve, reject) => this.waiters.push({ targetSec, resolve, reject }) ); }, // Snapshot the segments covering [startSec, endSec). Caller gets // a fresh array safe to mutate. getSegments(startSec, endSec) { const out = []; for (const s of this.segments) { const t = s.start || 0; if (t >= startSec && t < endSec) out.push(s); } return out; }, // Mark the buffer dead so all current + future waiters reject. // Called when transcribe throws — without this, runPipelinedAnalysis // workers would hang forever waiting for a window that'll never // become ready. fail(reason) { this.failed = true; this.failedReason = reason instanceof Error ? reason : new Error(String(reason || "transcribe failed")); for (const w of this.waiters) { try { w.reject(this.failedReason); } catch {} } this.waiters = []; }, }; }