From 705807e2866021b88c027c935fc20df01bb20e54 Mon Sep 17 00:00:00 2001 From: Keysat Date: Sat, 13 Jun 2026 13:35:53 -0500 Subject: [PATCH] Add internal-meetings pipeline and post-hoc speaker tools --- public/job-output-view.html | 330 +++ server/audio-meta.js | 171 ++ server/chunk-buffer.js | 142 ++ server/chunked-analyze.js | 1256 ++++++++++++ server/job-stats.js | 323 +++ server/jobs.js | 219 ++ server/meeting-extras.js | 376 ++++ server/meeting-speaker-edits.js | 359 ++++ server/output-store.js | 163 ++ server/post-cluster-polish.js | 655 ++++++ server/routes/internal-meetings.js | 2225 +++++++++++++++++++++ server/speaker-clustering.js | 624 ++++++ server/test/meeting-speaker-edits.test.js | 269 +++ server/test/polish-speaker-labels.test.js | 60 + server/test/speaker-clustering.test.js | 203 ++ 15 files changed, 7375 insertions(+) create mode 100644 public/job-output-view.html create mode 100644 server/audio-meta.js create mode 100644 server/chunk-buffer.js create mode 100644 server/chunked-analyze.js create mode 100644 server/job-stats.js create mode 100644 server/jobs.js create mode 100644 server/meeting-extras.js create mode 100644 server/meeting-speaker-edits.js create mode 100644 server/output-store.js create mode 100644 server/post-cluster-polish.js create mode 100644 server/routes/internal-meetings.js create mode 100644 server/speaker-clustering.js create mode 100644 server/test/meeting-speaker-edits.test.js create mode 100644 server/test/polish-speaker-labels.test.js create mode 100644 server/test/speaker-clustering.test.js diff --git a/public/job-output-view.html b/public/job-output-view.html new file mode 100644 index 0000000..11c712e --- /dev/null +++ b/public/job-output-view.html @@ -0,0 +1,330 @@ + + + + + + Job output — Recap Relay + + + + +
+
Loading job output…
+
+ + + + diff --git a/server/audio-meta.js b/server/audio-meta.js new file mode 100644 index 0000000..7c7ed68 --- /dev/null +++ b/server/audio-meta.js @@ -0,0 +1,171 @@ +// Wrapper around ffprobe for getting the playable duration of an +// audio file. Used by the transcribe routes to record audio_seconds +// alongside each audit entry, so the dashboard can normalize wall- +// clock duration to "ms per minute of audio" — a backend-agnostic +// speed benchmark. +// +// Returns the duration in seconds (float), or null if ffprobe fails +// or the file isn't probeable. Never throws — best-effort metadata +// shouldn't break the request that needs it. + +import { execFile } from "child_process"; +import { promisify } from "util"; +import fs from "fs/promises"; +import os from "os"; +import path from "path"; + +const execFileAsync = promisify(execFile); + +// NOTE: there is intentionally NO default chunk size export here. +// The canonical default lives in server/config.js +// (`relay_hardware_tx_chunk_minutes` and `relay_gemini_tx_chunk_minutes`) +// and flows down through createHardwareBackend / createGeminiBackend +// to splitAudioFile. Removed in v0.2.32 so there's exactly one place +// to change the default — the Settings tab in the dashboard. + +// Runs ffprobe on a file path. Returns seconds, or null on any failure. +export async function getAudioDurationSeconds(filePath) { + if (!filePath) return null; + try { + // -v error: silence everything except hard errors + // -show_entries format=duration: just the duration float + // -of default=noprint_wrappers=1:nokey=1: bare number, no labels + const { stdout } = await execFileAsync( + "ffprobe", + [ + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + filePath, + ], + { timeout: 10_000 } + ); + const seconds = parseFloat(stdout.trim()); + if (!Number.isFinite(seconds) || seconds <= 0) return null; + return seconds; + } catch { + return null; + } +} + +// Split an audio file into fixed-length chunks via ffmpeg. Returns +// an array of { filePath, startSeconds, durationSeconds, index } +// ordered by startSeconds. Uses -acodec copy so it's lossless and +// fast (no re-encoding pass). Returns an empty array if the audio +// is shorter than chunkSeconds — caller should just send the +// original file in that case. +// +// Used by the hardware backend to keep Parakeet calls within memory +// limits on long audio. The relay's audit log later records audio_seconds +// for the WHOLE file (not per-chunk) so the dashboard's +// "ms per minute of audio" benchmark stays meaningful. +export async function splitAudioFile({ + inputPath, + outputDir, + chunkSeconds, + overlapSeconds = 0, +}) { + if (!Number.isFinite(chunkSeconds) || chunkSeconds <= 0) { + throw new Error("splitAudioFile: chunkSeconds is required (no default — pass an explicit value from config)"); + } + if ( + !Number.isFinite(overlapSeconds) || + overlapSeconds < 0 || + overlapSeconds >= chunkSeconds + ) { + // Overlap must be smaller than chunk size or the loop never + // advances. 0 is fine (no overlap, original behavior). + overlapSeconds = 0; + } + const duration = await getAudioDurationSeconds(inputPath); + if (!duration || duration <= chunkSeconds) return []; + const chunks = []; + let startSec = 0; + let i = 0; + const ext = path.extname(inputPath).replace(/^\./, "") || "mp3"; + // Advance step = chunkSeconds - overlap. Each chunk still has + // length up to chunkSeconds; consecutive chunks share `overlap` + // seconds at their boundary. The caller's stitching code dedupes + // by dropping the overlapping prefix from chunk N+1 (and all + // subsequent chunks). + const advanceStep = chunkSeconds - overlapSeconds; + while (startSec < duration) { + const chunkPath = path.join(outputDir, `chunk_${i}.${ext}`); + const segLen = Math.min(chunkSeconds, duration - startSec); + try { + await execFileAsync( + "ffmpeg", + [ + "-y", + "-i", + inputPath, + "-ss", + String(startSec), + "-t", + String(segLen), + "-acodec", + "copy", + chunkPath, + ], + { timeout: 120_000 } + ); + } catch (err) { + // `-acodec copy` fails on some containers/streams that don't + // start on a keyframe at the cut point. Retry with re-encoding, + // which always works at the cost of CPU time. + await execFileAsync( + "ffmpeg", + [ + "-y", + "-i", + inputPath, + "-ss", + String(startSec), + "-t", + String(segLen), + chunkPath, + ], + { timeout: 180_000 } + ); + } + chunks.push({ + filePath: chunkPath, + startSeconds: startSec, + durationSeconds: segLen, + // Boundary marker: timestamps strictly less than this value + // are duplicates of the prior chunk's tail (overlap region). + // Caller dedupes by dropping output before this boundary. + // For chunk 0 this equals startSec (no prior chunk), so the + // boundary check is a no-op. + overlapBoundarySec: i === 0 ? startSec : startSec + overlapSeconds, + index: i, + }); + startSec += advanceStep; + i++; + } + return chunks; +} + +// Convenience wrapper for callers holding the audio in memory (the +// /relay/transcribe route receives multipart uploads as buffers). +// Writes a temp file, probes, cleans up. Cheaper than re-streaming +// through ffprobe's stdin which doesn't always handle every format +// reliably. +export async function getAudioDurationSecondsFromBuffer(buffer) { + if (!buffer || !buffer.length) return null; + const tmpFile = path.join( + os.tmpdir(), + `relay-probe-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` + ); + try { + await fs.writeFile(tmpFile, buffer); + return await getAudioDurationSeconds(tmpFile); + } catch { + return null; + } finally { + fs.unlink(tmpFile).catch(() => {}); + } +} diff --git a/server/chunk-buffer.js b/server/chunk-buffer.js new file mode 100644 index 0000000..685b65f --- /dev/null +++ b/server/chunk-buffer.js @@ -0,0 +1,142 @@ +// Chunk-buffer state used by the pipelined-analyze path in +// routes/summarize-url.js. The hardware backend fires +// onChunkComplete(chunkData) as each transcribe chunk finishes; +// this buffer: +// - drains chunks in INDEX ORDER (chunks may arrive out of order +// when concurrency > 1; we hold them in `pending` until the +// next-expected index lands so dedup against the prior chunk's +// overlap boundary is deterministic) +// - dedupes each new chunk's segments against the prior chunk's +// overlapBoundarySec — same logic that runs at end-of-transcribe +// in hardware.js, but applied incrementally so analyze can read +// a clean, no-duplicates segment view per window +// - tracks coveredEndSec (the maximum global timestamp the deduped +// buffer extends to, considering ONLY in-order chunks) +// - lets the analyze workers await `waitForTime(targetSec)` and +// query `getSegments(startSec, endSec)` to build per-window +// analyze inputs as soon as the required chunks are in +// +// Failure modes: +// - A chunk fails entirely → its segments are empty / undefined. +// The buffer still advances nextExpected past it so later chunks +// aren't stuck behind. The window covering that chunk's range +// gets a shorter transcript and may yield no sections (or fewer +// than expected). Downstream stitcher tolerates gaps. +// - waitForTime can wait forever if the relevant chunk index +// never arrives. Caller is responsible for racing this against +// the transcribe Promise so a transcribe failure unblocks all +// pending waiters via reject. + +export function createChunkBuffer() { + return { + // Sparse staging area for chunks that arrived out of index order. + pending: new Map(), + // Drained, deduped, sorted-by-start segments. Append-only. + segments: [], + // Index of the next chunk we're waiting on to drain. + nextExpected: 0, + // Total chunk count, populated on the first onChunkComplete call. + totalChunks: null, + // Greatest global end-time covered by drained chunks. NOT just + // max(pending) — out-of-order pending chunks don't count until + // their predecessors land, so dedup is consistent. + coveredEndSec: 0, + // The previous chunk's overlap boundary in GLOBAL seconds. + // Segments in the next chunk with start < this are duplicates of + // segments already in the prior chunk's tail and get dropped. + prevOverlapBoundary: 0, + // Async waiters: { targetSec, resolve, reject } + waiters: [], + // Set true on terminal failure so future waiters reject immediately + // instead of hanging. + failed: false, + failedReason: null, + + add(chunkData) { + if (this.failed) return; + if (chunkData == null) return; + if (this.totalChunks == null && Number.isInteger(chunkData.totalChunks)) { + this.totalChunks = chunkData.totalChunks; + } + this.pending.set(chunkData.chunkIndex, chunkData); + // Drain consecutive chunks starting from nextExpected + while (this.pending.has(this.nextExpected)) { + const c = this.pending.get(this.nextExpected); + this.pending.delete(this.nextExpected); + const segs = Array.isArray(c.segments) ? c.segments : []; + // Dedup against the global overlap boundary set by the prior + // chunk. Same predicate hardware.js uses at end-of-transcribe + // for the global stitch: `seg.start >= prevOverlapBoundary`. + for (const s of segs) { + if ((s.start || 0) >= this.prevOverlapBoundary) { + this.segments.push(s); + } + } + // overlapBoundarySec from audio-meta.js is ALREADY a global + // timestamp (= startSec + overlapSeconds at chunking time), + // NOT a chunk-relative offset. The earlier `c.startSeconds + + // c.overlapBoundarySec` double-counted: chunk 1 ended up + // with prevOverlapBoundary=570 instead of 300, chunk 2 + // 1110 instead of 570, and by chunk 3+ the boundary had + // outrun every subsequent chunk's segments — all dropped. + // Symptom: window 1 received only ~30% of the segments it + // should have, windows 2-6 received zero. Matches the + // formula hardware.js uses at end-of-transcribe (with + // offsetSeconds=0 for summarize-url callers). + this.prevOverlapBoundary = c.overlapBoundarySec || 0; + const endHere = (c.startSeconds || 0) + (c.durationSeconds || 0); + if (endHere > this.coveredEndSec) this.coveredEndSec = endHere; + this.nextExpected += 1; + } + this.checkWaiters(); + }, + + checkWaiters() { + const stillWaiting = []; + for (const w of this.waiters) { + if (this.coveredEndSec >= w.targetSec) { + w.resolve(); + } else { + stillWaiting.push(w); + } + } + this.waiters = stillWaiting; + }, + + // Block until coveredEndSec reaches targetSec. Rejects with the + // failedReason if the buffer is poisoned by a transcribe failure. + waitForTime(targetSec) { + if (this.failed) return Promise.reject(this.failedReason); + if (this.coveredEndSec >= targetSec) return Promise.resolve(); + return new Promise((resolve, reject) => + this.waiters.push({ targetSec, resolve, reject }) + ); + }, + + // Snapshot the segments covering [startSec, endSec). Caller gets + // a fresh array safe to mutate. + getSegments(startSec, endSec) { + const out = []; + for (const s of this.segments) { + const t = s.start || 0; + if (t >= startSec && t < endSec) out.push(s); + } + return out; + }, + + // Mark the buffer dead so all current + future waiters reject. + // Called when transcribe throws — without this, runPipelinedAnalysis + // workers would hang forever waiting for a window that'll never + // become ready. + fail(reason) { + this.failed = true; + this.failedReason = reason instanceof Error + ? reason + : new Error(String(reason || "transcribe failed")); + for (const w of this.waiters) { + try { w.reject(this.failedReason); } catch {} + } + this.waiters = []; + }, + }; +} diff --git a/server/chunked-analyze.js b/server/chunked-analyze.js new file mode 100644 index 0000000..7af2493 --- /dev/null +++ b/server/chunked-analyze.js @@ -0,0 +1,1256 @@ +// Chunked topic-analysis for the relay's test-run worker. Mirrors the +// approach in `/Users/macpro/Projects/recap/server/chunked-analyze.js` +// (Recap app) but adapted for the relay: +// +// 1. Input is the relay's bracketed-text transcript (the same format +// that gets saved to the output store), not a structured `entries` +// array. A small parser converts the bracketed text back into +// { offset, text } entries before windowing. +// +// 2. Each window emits its OWN audit-log row (pipeline="analyze", +// with window_idx + window_count fields), so the Jobs table's +// "AN windows" column reflects real parallel window count and +// `analyze_ms` becomes the sum-of-windows (total backend work). +// Per-window `audio_seconds` = window BODY seconds so per-row +// rate stats (s/audio-min) divide by the right denominator. +// +// 3. Single backend, single model per call. The recap-app version +// walks a model-fallback chain per window; the relay only ever +// runs one model per benchmark permutation, so no fallback loop. +// +// 4. No abort signal / jobId threading — the relay's test-run worker +// manages cancellation at a higher level via job state. +// +// Tunables (window body, overlap, concurrency, cutoff) are passed in +// from the test-run worker, which sources them from the live-reloaded +// /data/config/relay-config.json. No hardcoded defaults here — the +// caller MUST supply explicit values, sourced from one canonical place +// (server/config.js defaultConfig()). +// +// Public entry point: `runChunkedAnalysis()` at the bottom. + +import { recordCall } from "./audit-log.js"; +import { + sortAndDedupeTranscript, + mergeShortEntries, +} from "./backends/gemini.js"; + +// ───────────────────────────────────────────────────────────────────── +// Bracketed-transcript → entries parser +// ───────────────────────────────────────────────────────────────────── +// Both the Gemini backend and the Hardware backend produce transcripts +// in the shape: +// +// [MM:SS] First caption line. +// [MM:SS] Next caption line. +// [H:MM:SS] (with hour for >60-min videos) +// +// We parse those bracketed timestamps back into entries so the +// windowing logic (which is offset-aware) can plan time-based windows. +// Lines without a leading bracket are appended to the previous entry's +// text (handles wrapped lines). + +const TIMESTAMP_RE = /^\[(\d{1,2}):(\d{2})(?::(\d{2}))?\]\s*(.*)$/; + +export function parseBracketedTranscript(text) { + if (!text || typeof text !== "string") return []; + const lines = text.split(/\r?\n/); + const entries = []; + for (const line of lines) { + const m = line.match(TIMESTAMP_RE); + if (m) { + let offset; + if (m[3] !== undefined) { + // [H:MM:SS] — h, mm, ss in groups 1/2/3 + offset = parseInt(m[1], 10) * 3600 + parseInt(m[2], 10) * 60 + parseInt(m[3], 10); + } else { + // [MM:SS] — mm, ss in groups 1/2 + offset = parseInt(m[1], 10) * 60 + parseInt(m[2], 10); + } + entries.push({ offset, text: (m[4] || "").trim() }); + } else if (entries.length > 0) { + // Continuation line — append to the previous entry's text. + const last = entries[entries.length - 1]; + const extra = line.trim(); + if (extra) last.text = last.text ? last.text + " " + extra : extra; + } + // Lines before the first bracket (e.g. a header) are discarded. + } + return entries; +} + +// Format seconds → "[H:MM:SS]" or "[MM:SS]" matching the relay's +// bracketed style. +function fmtTimestamp(secs) { + const s = Math.max(0, Math.floor(secs || 0)); + const h = Math.floor(s / 3600); + const m = Math.floor((s % 3600) / 60); + const sec = s % 60; + const pad = (n) => n.toString().padStart(2, "0"); + return h > 0 ? `${h}:${pad(m)}:${pad(sec)}` : `${m}:${pad(sec)}`; +} + +// ───────────────────────────────────────────────────────────────────── +// Window planning — same algorithm as recap-app's planAnalysisWindows +// ───────────────────────────────────────────────────────────────────── +// Returns array of windows, each: { startIdx, endIdx, bodyStartIdx, +// startSec, bodyStartSec, bodyEndSec }. We carry the second-domain +// values forward (not just entry-index) so the audit row's per-window +// audio_seconds can be set to the body length exactly. + +export function planAnalysisWindows(entries, { bodySeconds, overlapSeconds, cutoffSeconds }) { + if (!entries.length) return []; + const totalSec = + (entries[entries.length - 1].offset || 0) + + (entries[entries.length - 1].duration || 0); + if (totalSec <= cutoffSeconds) { + return [ + { + startIdx: 0, + endIdx: entries.length - 1, + bodyStartIdx: 0, + startSec: 0, + bodyStartSec: 0, + bodyEndSec: totalSec, + }, + ]; + } + const windows = []; + let bodyStartSec = 0; + while (bodyStartSec < totalSec) { + const bodyEndSec = Math.min(totalSec, bodyStartSec + bodySeconds); + const windowEndSec = bodyStartSec + bodySeconds + overlapSeconds; + const bodyStartIdx = firstEntryAtOrAfter(entries, bodyStartSec); + // No more entries — done. + if (bodyStartIdx >= entries.length) break; + // Gap handling: if the next entry past bodyStartSec sits beyond + // this window's end (e.g., a middle TX chunk was truncated and + // left a hole in the timeline), don't stop the loop — jump the + // body cursor forward to the next entry's body-stride boundary + // so the entries on the other side of the gap still get a window. + const nextEntryOffset = entries[bodyStartIdx].offset || 0; + if (nextEntryOffset >= windowEndSec) { + bodyStartSec = Math.max( + bodyStartSec + bodySeconds, + Math.floor(nextEntryOffset / bodySeconds) * bodySeconds + ); + continue; + } + const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSeconds); + const startIdx = firstEntryAtOrAfter(entries, overlapWithPriorSec); + const endIdx = lastEntryBefore(entries, windowEndSec); + if (endIdx < bodyStartIdx) { + // Defensive — the gap check above should prevent this. If we + // still hit it, advance and continue rather than break. + bodyStartSec += bodySeconds; + continue; + } + windows.push({ + startIdx, + endIdx, + bodyStartIdx, + startSec: overlapWithPriorSec, + bodyStartSec, + bodyEndSec, + }); + if (endIdx >= entries.length - 1) break; + bodyStartSec += bodySeconds; + } + return windows; +} + +export function firstEntryAtOrAfter(entries, sec) { + for (let i = 0; i < entries.length; i++) { + if ((entries[i].offset || 0) >= sec) return i; + } + return entries.length; +} + +export function lastEntryBefore(entries, sec) { + let ans = -1; + for (let i = 0; i < entries.length; i++) { + if ((entries[i].offset || 0) < sec) ans = i; + else break; + } + return ans; +} + +// Find the canonical entry index whose [offset, offset+duration] +// range contains the given time (in seconds). Falls back to the +// nearest entry by offset distance when no containing entry exists. +// Returns -1 only when `entries` is empty. +export function canonicalIndexForOffset(entries, sec) { + if (!entries.length) return -1; + // Exact-match-or-containing pass + for (let i = 0; i < entries.length; i++) { + const start = entries[i].offset || 0; + const dur = entries[i].duration || 0; + if (start <= sec && sec <= start + dur) return i; + if (start > sec) { + // Past it without a containing match — return the closer of + // entries[i-1] vs entries[i] by offset distance. + if (i === 0) return 0; + const prevDist = sec - (entries[i - 1].offset || 0); + const nextDist = (entries[i].offset || 0) - sec; + return prevDist <= nextDist ? i - 1 : i; + } + } + // Past the last entry — clamp to end. + return entries.length - 1; +} + +// ───────────────────────────────────────────────────────────────────── +// Per-window prompt builder +// ───────────────────────────────────────────────────────────────────── +// Same shape as the test-run worker's original single-shot prompt, +// but emitted per-window slice. Includes the entry's offset prefix so +// the model can use timestamps when picking section boundaries. + +// Default analyze-prompt template. Three template variables get +// interpolated at request time: +// {{windowMin}} — window length in minutes (derived from entries) +// {{targetSections}} — count target string ("1 section" / "1-2 sections" / ...) +// {{maxIndex}} — windowEntries.length - 1, used in the completeness +// constraint so the model is told the exact valid +// index range it must cover. NEW in this version +// alongside the numbered-line transcript format. +// {{transcript}} — the numbered + timestamped transcript text for the +// window, formatted as `[N] (MM:SS) text` per line +// (was `[MM:SS] text` before; switched to match the +// Recap-app's direct-to-Gemini path which produces +// much better section coverage because the model +// can read indices off the input rather than count +// bracketed lines internally). +// Operator-editable via the dashboard's Settings tab. The variable +// names MUST be preserved across edits; the validation in +// /admin/settings PUT checks that the prompt still contains +// {{transcript}} and the "JSON" output instruction. +export const DEFAULT_ANALYZE_PROMPT_TEMPLATE = `You are analyzing a ~{{windowMin}}-minute section of a longer transcript. Your job is to identify natural topic boundaries and group the transcript into discussion-based sections — aim for {{targetSections}}. + +TRANSCRIPT (each line is numbered with a timestamp): +{{transcript}} + +INSTRUCTIONS: +1. Read the entire transcript carefully. +2. Identify where the discussion naturally shifts from one topic to another. +3. Group consecutive transcript segments by topic. Some sections may be short (a quick aside) and some may be long (an extended deep-dive). Let the content dictate the length. +4. For each section, write: + - A short, specific topic title (3-8 words) + - A 1-3 sentence summary of what's discussed + - The start and end segment indices (inclusive), counted as the bracketed [N] number at the start of each transcript line above. + +IMPORTANT: +- Sections must be chronological and non-overlapping. +- Every segment index from 0 to {{maxIndex}} must belong to exactly one section. +- startIndex of section N+1 must equal endIndex of section N plus 1. +- Create as many or as few sections as the content naturally requires — but lean toward broad, substantive topics rather than minute-by-minute breakdowns. A natural topic that spans several minutes of dialogue should be one section, not several. +- Titles should be descriptive and specific, not generic like "Introduction" unless it truly is one. + +Respond with ONLY valid JSON in this exact format, no other text: +{ + "sections": [ + { + "title": "Brief Topic Title", + "summary": "1-3 sentence summary of this discussion section.", + "startIndex": 0, + "endIndex": 15 + } + ] +}`; + +// Pick the target-total-sections value for a given total audio +// duration. The buckets match the Settings UI rows exactly so what +// the operator sees in their preview table is what the prompt +// receives at request time. Falls back to the next-larger bucket\'s +// default when a setting is missing/non-numeric. +function pickTargetTotalSections(totalAudioSec, totalsByBucket) { + const totalMin = (totalAudioSec || 0) / 60; + const pick = (k, fallback) => { + const v = Number(totalsByBucket?.[k]); + return Number.isFinite(v) && v > 0 ? v : fallback; + }; + if (totalMin < 30) return pick("under_30", 6); + if (totalMin < 60) return pick("30_60", 8); + if (totalMin < 90) return pick("60_90", 9); + if (totalMin < 120) return pick("90_120", 10); + if (totalMin < 150) return pick("120_150", 11); + if (totalMin < 180) return pick("150_180", 12); + return pick("over_180", 12); +} + +// Compute the per-window target sections from the total-target + +// window/audio geometry. Returns BOTH a numeric average (for the +// dashboard preview) and a free-form string to splice into the +// prompt. The string is intentionally round numbers / short ranges +// the model can interpret literally — fractional averages get +// expressed as "N–M sections" so the model is given latitude rather +// than a confusing "2.5 sections" instruction. +// +// Edge cases: +// - Empty audio → returns 1 section (defensive). +// - Single-window job (windowSec >= totalAudioSec) → returns +// full target total (no division by less-than-one num-windows). +// - Fractional result (e.g. 2.5) → returns "2–3 sections". +// - Integer result (e.g. 3.0) → returns "around 3 sections". +export function computePerWindowTarget({ totalAudioSec, windowSec, totalsByBucket }) { + const target = pickTargetTotalSections(totalAudioSec, totalsByBucket); + if (!totalAudioSec || totalAudioSec <= 0) { + return { average: 1, label: "1 section", target_total: target }; + } + // Effective number of windows the audio spans. Clamped to 1 so a + // single-shot run (audio <= analyze_cutoff) gets the full target. + const numWindows = Math.max(1, totalAudioSec / Math.max(60, windowSec || 60)); + const avg = target / numWindows; + // Build the prompt-facing label. + let label; + if (avg <= 1.2) { + label = "1 section"; + } else { + const lo = Math.max(1, Math.floor(avg)); + const hi = Math.max(lo, Math.ceil(avg)); + if (lo === hi) { + label = "around " + lo + " sections"; + } else { + label = lo + "–" + hi + " sections"; + } + } + return { average: avg, label, target_total: target }; +} + +function buildWindowPrompt(windowEntries, promptOverride, targetTotalsByBucket, totalAudioSec) { + // Numbered + timestamped transcript format. Each line is + // [N] (MM:SS) text + // where N is the entry's position within THIS window (not its + // global position in the full transcript — the stitcher + // translates window-local indices to global at result time). + // + // Why numbered: the model is asked to emit startIndex/endIndex + // for each section. If indices aren't shown in the input, the + // model has to count bracketed-timestamp lines and frequently + // hallucinates out-of-range values (e.g., emitting endIndex=9999 + // for a 200-entry window). Showing indices makes the contract + // explicit and dramatically reduces hallucination. + const transcriptText = windowEntries + .map((e, i) => `[${i}] (${fmtTimestamp(e.offset)}) ${e.text}`) + .join("\n"); + // Window length in seconds (last entry's offset minus first's), + // used to scale the suggested section count. The actual per-window + // section target also depends on the TOTAL video duration, not + // just this window's slice — see computePerWindowTarget(). + const windowSec = windowEntries.length > 1 + ? (windowEntries[windowEntries.length - 1].offset || 0) - (windowEntries[0].offset || 0) + : 0; + const windowMin = Math.max(1, Math.round(windowSec / 60)); + // Max valid index = last entry's position in this window's slice. + // Substituted into the prompt's completeness clause so the model + // is told the exact range it must cover (0 through maxIndex + // inclusive). + const maxIndex = Math.max(0, windowEntries.length - 1); + // Section-count target: the new model is total-sections-per-video + // bucketed by TOTAL audio duration. The relay divides by the + // effective number of windows ((totalAudioSec / windowSec), clamped + // to >=1) to produce the per-window average, then formats it as a + // human-readable string for {{targetSections}}. + // + // `totalAudioSec` is passed in from the caller (runChunkedAnalysis + // receives it from summarize-url.js' worker). When unavailable + // (e.g. legacy callers), computePerWindowTarget falls back to "1 + // section" — defensive, won't crash but won't be useful either. + const targetCalc = computePerWindowTarget({ + totalAudioSec, + windowSec, + totalsByBucket: targetTotalsByBucket || {}, + }); + const targetSections = targetCalc.label; + const template = (typeof promptOverride === "string" && promptOverride.trim()) + ? promptOverride + : DEFAULT_ANALYZE_PROMPT_TEMPLATE; + // Simple {{var}} interpolation. We DON'T use a templating library + // to keep this dependency-free and predictable. Variables are + // replaced literally; unknown {{var}} tokens pass through as-is + // (helpful if the operator typos a variable — they'll see it in + // the model's output and know to fix it). + return template + .replaceAll("{{windowMin}}", String(windowMin)) + .replaceAll("{{targetSections}}", targetSections) + .replaceAll("{{maxIndex}}", String(maxIndex)) + .replaceAll("{{transcript}}", transcriptText); +} + +// Strip code fences + parse a JSON-formatted analyze response into +// `{ sections: [...] }`. Returns null on parse failure. +function safeParseSections(text) { + if (!text || typeof text !== "string") return null; + let jsonStr = text.trim(); + const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/); + if (cb) jsonStr = cb[1].trim(); + try { + const parsed = JSON.parse(jsonStr); + return parsed && Array.isArray(parsed.sections) ? parsed : null; + } catch { + return null; + } +} + +// ───────────────────────────────────────────────────────────────────── +// Stitcher +// ───────────────────────────────────────────────────────────────────── +// Combines per-window section lists into a single ordered list of +// non-overlapping sections referencing entries by their position in +// the FULL entries array. Same algorithm as recap-app's stitcher: +// each window N owns sections whose globalStart falls before +// window(N+1).bodyStartIdx. Last window has no successor → keep all. + +export function stitchAnalysisResults(results) { + const out = []; + for (let i = 0; i < results.length; i++) { + const r = results[i]; + if (!r || !r.ok) continue; + const next = results[i + 1]; + const nextBody = + next && next.window ? next.window.bodyStartIdx : Infinity; + const offset = r.window.startIdx; + // Cap each section's endIndex to this window's own range. Without + // this clamp, a model that emits a section with an out-of-range + // endIndex (e.g., endIndex: 9999 — happens when the LLM + // hallucinates a value past the window it was given) ends up + // with a globalEnd extending way past the window's actual end. + // Symptom: an analyze window covers 18 minutes of transcript but + // produces a section spanning 50+ minutes in the final output, + // because the inflated endIndex propagates all the way to the + // UI. The clamp uses r.window.endIdx, which is the last entry + // index this window's transcript actually contains. + const windowMaxEndIdx = r.window.endIdx; + for (const s of r.sections) { + const globalStart = offset + (s.startIndex ?? 0); + const rawGlobalEnd = offset + (s.endIndex ?? 0); + const globalEnd = Math.min(rawGlobalEnd, windowMaxEndIdx); + if (globalStart >= nextBody) continue; + // Skip sections that the clamp pushed below startIndex — + // these were degenerate to begin with (the model emitted + // start > end) and the clamp can't recover them. + if (globalEnd < globalStart) continue; + out.push({ + startIndex: globalStart, + endIndex: globalEnd, + title: s.title, + summary: s.summary, + }); + } + } + // Dedup with proper containment handling. The earlier implementation + // (sort + trim-on-next-start) handled simple partial overlaps but + // silently let TWO sections survive when one fully contained the + // other (e.g., model emits both "Systemic Critique 1:10-1:23" AND + // "Decentralizing 1:10-1:12" in the same window output). Symptom: + // the UI shows two topics starting at the same timestamp with one's + // range fully inside the other's. Algorithm: + // 1. Sort by startIndex ASC. Tiebreak by endIndex ASC so the + // SMALLER (more specific) section comes first when two share + // a start — the cross-section trim below then drops the + // small one into a degenerate range, and we filter it out. + // 2. Walk left-to-right, tracking the largest endIndex seen so + // far ("maxEndSeen"). A section whose endIndex is already + // covered (<= maxEndSeen) is FULLY contained in something we + // already accepted — drop it. + // 3. For partial overlap (current.startIndex <= previous.endIndex), + // trim the previous section's endIndex back to current.startIndex - 1. + // If that makes the previous degenerate, pop it. + out.sort( + (a, b) => a.startIndex - b.startIndex || a.endIndex - b.endIndex + ); + const deduped = []; + let maxEndSeen = -1; + for (const s of out) { + if (s.endIndex < s.startIndex) continue; + // Fully contained in something already accepted — drop. + if (s.endIndex <= maxEndSeen) continue; + // Partial overlap with the prior accepted section — trim the + // prior to end one entry before this one starts. + const prev = deduped[deduped.length - 1]; + if (prev && prev.endIndex >= s.startIndex) { + prev.endIndex = s.startIndex - 1; + if (prev.endIndex < prev.startIndex) { + deduped.pop(); + } + } + deduped.push(s); + if (s.endIndex > maxEndSeen) maxEndSeen = s.endIndex; + } + return deduped; +} + +// ───────────────────────────────────────────────────────────────────── +// Public entry point +// ───────────────────────────────────────────────────────────────────── +// Runs chunked analysis end-to-end: +// +// transcriptText (bracketed [MM:SS] format) +// → entries[] +// → windows[] +// → analyze each window in parallel (bounded concurrency) +// → audit per-window +// → stitch sections into final list +// +// Each window emits an audit row with pipeline="analyze", +// audio_seconds=windowBodySeconds, window_idx=N, window_count=K, +// duration_ms=this window's analyze wall time, status="success"|"error". +// On error, the per-window row records the error message AND the +// overall run continues — failed windows are dropped from stitching +// (with a warning) rather than aborting the whole job. +// +// Returns: +// { +// text: "", // for downstream parity +// model: dominantModelName, +// usage: null, +// attempts: { windows: N, failed: K }, +// } +// or throws when ALL windows failed. + +export async function runChunkedAnalysis({ + transcriptText, + backend, + // Audit-row metadata (passed through to each per-window recordCall): + pipelineBackend, // "gemini" | "hardware" + jobId, + batchId, + mediaUrl, + title, + installId, + // Audit-only — paired with install_id on every per-window recordCall + // so license-pool aggregations on the dashboard see this work. Pure + // forensic field; credits.js isn't called from here. + licenseFingerprint = null, + source, + // Cost helper — caller passes a function (model, usage) → costDetails + // because gemini.js's cost calculator is gemini-specific. Returns + // { input_tokens, output_tokens, thinking_tokens, cost_usd }; hardware + // path passes a no-op helper that returns zeros. + computeCostDetails, + // Tunables (all required, no hardcoded defaults here): + bodySeconds, + overlapSeconds, + concurrency, + cutoffSeconds, + // Operator-editable analyze prompt template (Settings tab). + // Empty/missing falls back to DEFAULT_ANALYZE_PROMPT_TEMPLATE. + // Same applies to both Gemini and operator-hardware analyze paths. + analyzePromptOverride = "", + // Operator-editable section-count target totals per VIDEO duration + // bucket. Shape: { under_30, "30_60", "60_90", "90_120", "120_150", + // "150_180", over_180 } — each is a NUMBER (target total sections + // for a video in that duration bucket). The relay computes a + // per-window average by dividing by num_windows and formats it + // into {{targetSections}}. See computePerWindowTarget() in this + // file for the math + label formatting. Missing/non-numeric + // buckets fall back to hardcoded defaults (6/8/9/10/11/12/12). + targetTotalsByBucket = null, + // Total audio duration in seconds — required for the per-window + // target math. Pulled by the worker from getAudioDurationSeconds() + // and threaded through. If omitted (legacy callers), the prompt + // emits "1 section" as a defensive fallback. + totalAudioSec = 0, + // Optional: called once per window AS SOON AS its sections are + // available (out of order, since windows resolve in parallel). The + // callback receives the window's BODY-OWNED sections — the ones + // the final stitcher will keep. Each section carries GLOBAL indices + // (into the full entries array) so the caller can render them in + // place without further translation. Used by /relay/summarize-url + // to push window_complete SSE events to the connected Recap client + // as windows arrive, so the user sees topics appearing in real time + // instead of waiting for the whole pipeline to finish. + onWindowComplete = null, +}) { + const entries = parseBracketedTranscript(transcriptText); + if (entries.length === 0) { + // Edge case: empty transcript. Record one failed analyze row so + // the Jobs table shows what happened, then throw. + await recordCall({ + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "analyze", + backend: pipelineBackend, + model: null, + status: "error", + duration_ms: 0, + audio_seconds: 0, + cost_usd: 0, + job_id: jobId, + batch_id: batchId, + source, + media_url: mediaUrl, + title: title || null, + error: "transcript empty — nothing to analyze", + window_idx: 0, + window_count: 1, + }); + throw new Error("transcript empty — nothing to analyze"); + } + const windows = planAnalysisWindows(entries, { + bodySeconds, + overlapSeconds, + cutoffSeconds, + }); + if (windows.length === 0) { + throw new Error("planAnalysisWindows produced no windows (unexpected)"); + } + + // Each window's audio_seconds = its BODY duration. This matches what + // the per-row rate stats SHOULD divide by, since a window analyzes a + // body+overlap span but only the body contributes to the stitched + // output. Last window's body is clamped to the transcript end. + const results = new Array(windows.length); + let nextIdx = 0; + + async function worker() { + while (true) { + const my = nextIdx++; + if (my >= windows.length) return; + const w = windows[my]; + const windowEntries = entries.slice(w.startIdx, w.endIdx + 1); + const prompt = buildWindowPrompt( + windowEntries, + analyzePromptOverride, + targetTotalsByBucket, + totalAudioSec + ); + const bodySec = Math.max(0, w.bodyEndSec - w.bodyStartSec); + const winStart = Date.now(); + try { + // Per-window analyze with up to 3 attempts on invalid-JSON / + // exception. Analyze is by far the cheapest pipeline phase + // (a few seconds per window vs 30+ seconds for transcribe), + // so being aggressive about retries is essentially free — + // worst-case wall time for a failing window goes from ~5s to + // ~15s, which is still trivial compared to the 30+ minute + // transcribe phase for long content. + // + // The relay also now passes `responseMimeType: + // "application/json"` on the analyze call (see gemini.js + // analyzeText), so Gemini decoder-enforces valid JSON output + // — invalid-JSON failures from prose preamble / markdown + // fence wrapping / truncated brace should be eliminated + // entirely on Gemini. The retry loop here is now mostly + // defense for the hardware (Gemma) path which has no + // equivalent decoder-side guarantee, plus capacity blips. + const MAX_ATTEMPTS = 3; + let r = null; + let parsed = null; + let lastAttemptErr = null; + for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + r = await backend.analyzeText({ prompt }); + parsed = safeParseSections(r.text); + if (parsed) { + lastAttemptErr = null; + break; + } + lastAttemptErr = "invalid JSON in window response"; + } catch (innerErr) { + r = null; + parsed = null; + lastAttemptErr = (innerErr?.message || String(innerErr)).slice(0, 280); + } + if (attempt < MAX_ATTEMPTS - 1) { + console.warn( + `[chunked-analyze] window ${my + 1}/${windows.length} attempt ${attempt + 1} failed (${lastAttemptErr}) — retrying` + ); + } + } + const dur = Date.now() - winStart; + if (!parsed) { + // Both attempts failed — record one audit row, drop from + // stitching, continue the run. The stitcher will skip this + // window's body region; the summarize-url credit gate will + // refuse to charge because anyWindowFailed becomes true. + results[my] = { window: w, ok: false, error: new Error(lastAttemptErr || "analyze failed") }; + await recordCall({ + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "analyze", + backend: pipelineBackend, + model: r?.model || null, + status: "error", + duration_ms: dur, + audio_seconds: bodySec, + cost_usd: 0, + job_id: jobId, + batch_id: batchId, + source, + media_url: mediaUrl, + title: title || null, + error: (lastAttemptErr || "analyze failed") + " (after " + MAX_ATTEMPTS + " attempts)", + window_idx: my, + window_count: windows.length, + window_body_seconds: bodySec, + }); + continue; + } + const costDetails = computeCostDetails(r.model, r.usage); + results[my] = { + window: w, + ok: true, + sections: parsed.sections, + model: r.model, + }; + await recordCall({ + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "analyze", + backend: pipelineBackend, + model: r.model || null, + status: "success", + duration_ms: dur, + audio_seconds: bodySec, + job_id: jobId, + batch_id: batchId, + source, + media_url: mediaUrl, + title: title || null, + window_idx: my, + window_count: windows.length, + window_body_seconds: bodySec, + ...costDetails, + }); + // Streaming callback: emit this window's body-owned sections + // as soon as they arrive. Uses the same ownership rule as the + // final stitcher (window N owns sections starting before + // window N+1's body), but computed locally with just this + // window's planned position — the next window's bodyStartIdx + // is known from the plan even if that window hasn't finished. + if (onWindowComplete) { + const next = windows[my + 1]; + const nextBodyIdx = next ? next.bodyStartIdx : Infinity; + const offset = w.startIdx; + // Same endIndex clamp as stitchAnalysisResults — a model + // that emits an out-of-range endIndex would otherwise + // produce a streaming partial-section spanning way past + // this window's transcript range. The final stitch + // re-clamps, but if the SSE client renders partials + // incrementally it'd briefly show the inflated bounds. + const windowMaxEndIdx = w.endIdx; + const owned = []; + for (const s of parsed.sections) { + const globalStart = offset + (s.startIndex ?? 0); + const rawGlobalEnd = offset + (s.endIndex ?? 0); + const globalEnd = Math.min(rawGlobalEnd, windowMaxEndIdx); + if (globalStart >= nextBodyIdx) continue; + if (globalEnd < globalStart) continue; + owned.push({ + startIndex: globalStart, + endIndex: globalEnd, + title: s.title, + summary: s.summary, + }); + } + try { + onWindowComplete({ + windowIdx: my, + totalWindows: windows.length, + ownedSections: owned, + windowBodySeconds: bodySec, + model: r.model || null, + durationMs: dur, + }); + } catch (cbErr) { + // Streaming is best-effort — don't fail the analyze loop + // if a callback throws (e.g. the SSE client disconnected + // mid-window). + console.warn( + `[chunked-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}` + ); + } + } + } catch (err) { + const dur = Date.now() - winStart; + results[my] = { window: w, ok: false, error: err }; + await recordCall({ + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "analyze", + backend: pipelineBackend, + model: null, + status: "error", + duration_ms: dur, + audio_seconds: bodySec, + cost_usd: 0, + job_id: jobId, + batch_id: batchId, + source, + media_url: mediaUrl, + title: title || null, + error: (err?.message || String(err)).slice(0, 400), + window_idx: my, + window_count: windows.length, + window_body_seconds: bodySec, + }); + } + } + } + + const workerPromises = Array.from( + { length: Math.min(concurrency, windows.length) }, + worker + ); + await Promise.all(workerPromises); + + const succeeded = results.filter((r) => r && r.ok); + const failed = results.filter((r) => r && !r.ok); + if (succeeded.length === 0) { + throw new Error( + `All ${results.length} analyze windows failed. First error: ${ + failed[0]?.error?.message || "unknown" + }` + ); + } + + const stitched = stitchAnalysisResults(results); + // Aggregate model attribution: dominant successful model. + const modelTally = new Map(); + for (const r of results) { + if (!r.ok || !r.model) continue; + modelTally.set(r.model, (modelTally.get(r.model) || 0) + 1); + } + const dominantModel = + [...modelTally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null; + + return { + text: JSON.stringify({ sections: stitched }), + model: dominantModel, + usage: null, + attempts: { windows: results.length, failed: failed.length }, + }; +} + +// ───────────────────────────────────────────────────────────────────── +// Pipelined analyze (Phase 2 of the streaming UX work). +// +// Lets analyze windows fire AS SOON AS their required transcribe +// chunks have completed, in parallel with later chunks still being +// transcribed. The wall-time savings are modest for short content +// where transcribe dwarfs analyze (a 94-min video here: ~12s of +// analyze that can hide under transcribe), but the user-perceived +// improvement is dramatic — first topics render at T=~80s instead +// of T=~160s, because they don't have to wait for transcribe of +// minute 80-94 to start drawing the first window's topics. +// +// Design: +// 1. Caller pre-plans windows by AUDIO TIME (not entries — entries +// don't exist yet). +// 2. Caller provides a `getReadyText(startSec, endSec) → string` +// function that returns the current transcribe output covering +// that time range (bracketed MM:SS lines). It throws or returns +// null if not enough chunks have completed yet. +// 3. Caller provides a `waitForTime(targetEndSec) → Promise` +// that resolves when transcribe has covered up to targetEndSec. +// 4. Each window's worker awaits its required time then calls +// analyzeOneWindow with the window's text. Workers run as a +// bounded concurrent pool same as runChunkedAnalysis. +// 5. Section indices that come back from each window are LOCAL to +// that window's entry slice. The caller is responsible for +// mapping them to GLOBAL entry indices after transcribe fully +// completes (when the canonical entries array exists). +// ───────────────────────────────────────────────────────────────────── + +// Pre-plan analyze windows from total audio duration alone. Same +// math as planAnalysisWindows but returns ONLY time fields (no +// entry indices) — entries aren't built yet during pipelining. The +// caller assembles entries per-window when firing. +export function planWindowsByDuration({ + totalAudioSec, + bodySeconds, + overlapSeconds, + cutoffSeconds, +}) { + if (!totalAudioSec || totalAudioSec <= 0) return []; + // Single-shot fast path for short audio — same threshold as + // planAnalysisWindows. + if (totalAudioSec <= cutoffSeconds) { + return [ + { + idx: 0, + startSec: 0, + bodyStartSec: 0, + bodyEndSec: totalAudioSec, + windowEndSec: totalAudioSec, + }, + ]; + } + const windows = []; + let bodyStartSec = 0; + let idx = 0; + while (bodyStartSec < totalAudioSec) { + const bodyEndSec = Math.min(totalAudioSec, bodyStartSec + bodySeconds); + const windowEndSec = Math.min( + totalAudioSec, + bodyStartSec + bodySeconds + overlapSeconds + ); + const overlapWithPriorSec = Math.max(0, bodyStartSec - overlapSeconds); + windows.push({ + idx, + startSec: overlapWithPriorSec, + bodyStartSec, + bodyEndSec, + windowEndSec, + }); + idx += 1; + if (bodyEndSec >= totalAudioSec) break; + bodyStartSec += bodySeconds; + } + return windows; +} + +// Single-window analyze with the same retry + audit semantics +// runChunkedAnalysis uses internally. Extracted so the pipelined +// path can call it per-window without duplicating the logic. +// +// Returns: { ok: true, sections, model } or { ok: false, error }. +// Audit row is recorded inside. +export async function analyzeOneWindow({ + windowEntries, + windowIdx, + windowCount, + windowBodySec, + windowStartSec, + windowEndSec, + backend, + // Audit / accounting params: + pipelineBackend, + jobId, + batchId, + mediaUrl, + title, + installId, + licenseFingerprint = null, + source, + computeCostDetails, + // Prompt params: + analyzePromptOverride = "", + targetTotalsByBucket = null, + totalAudioSec = 0, +}) { + if (!windowEntries || windowEntries.length === 0) { + const err = new Error( + `analyzeOneWindow window ${windowIdx + 1} has no entries — audio gap or pre-cutoff window` + ); + return { ok: false, error: err }; + } + const prompt = buildWindowPrompt( + windowEntries, + analyzePromptOverride, + targetTotalsByBucket, + totalAudioSec + ); + const winStart = Date.now(); + const MAX_ATTEMPTS = 3; + let r = null; + let parsed = null; + let lastAttemptErr = null; + for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + r = await backend.analyzeText({ prompt }); + parsed = safeParseSections(r.text); + if (parsed) { + lastAttemptErr = null; + break; + } + lastAttemptErr = "invalid JSON in window response"; + } catch (innerErr) { + r = null; + parsed = null; + lastAttemptErr = (innerErr?.message || String(innerErr)).slice(0, 280); + } + if (attempt < MAX_ATTEMPTS - 1) { + console.warn( + `[chunked-analyze] window ${windowIdx + 1}/${windowCount} attempt ${attempt + 1} failed (${lastAttemptErr}) — retrying` + ); + } + } + const dur = Date.now() - winStart; + const auditCommon = { + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "analyze", + backend: pipelineBackend, + duration_ms: dur, + audio_seconds: windowBodySec, + job_id: jobId, + batch_id: batchId, + source, + media_url: mediaUrl, + title: title || null, + window_idx: windowIdx, + window_count: windowCount, + window_body_seconds: windowBodySec, + }; + if (!parsed) { + await recordCall({ + ...auditCommon, + model: r?.model || null, + status: "error", + cost_usd: 0, + error: (lastAttemptErr || "analyze failed") + " (after " + MAX_ATTEMPTS + " attempts)", + }); + return { ok: false, error: new Error(lastAttemptErr || "analyze failed") }; + } + const costDetails = computeCostDetails(r.model, r.usage); + await recordCall({ + ...auditCommon, + model: r.model || null, + status: "success", + input_tokens: costDetails.input_tokens || 0, + output_tokens: costDetails.output_tokens || 0, + thinking_tokens: costDetails.thinking_tokens || 0, + cost_usd: costDetails.cost_usd || 0, + }); + return { + ok: true, + sections: parsed.sections, + model: r.model, + durationMs: dur, + }; +} + +// Run analyze in pipelined mode, fired window-by-window as transcribe +// chunks become available. See block comment above for the design. +// +// Inputs: +// audioDurationSec — total audio seconds (used to pre-plan windows) +// waitForTime(sec) — async; resolves when transcribe has covered +// the audio up through `sec` +// getReadySegments(startSec, endSec) — returns the deduped segments +// currently buffered for that time range (each +// { start, end, text } with global timestamps). +// Called per window AFTER waitForTime resolves. +// concurrency — max simultaneous analyze windows +// onWindowComplete — fired per window (same shape as +// runChunkedAnalysis's callback) with WINDOW- +// LOCAL section indices. Caller maps to global +// indices at end of pipeline. +// ... — all the audit + tunable params analyzeOneWindow +// needs +// +// Returns: +// { +// windowResults: [{ window, ok, sections, model, windowEntries }], +// dominantModel, +// attempts: { windows, failed }, +// } +// where each windowEntries is the entries slice that window analyzed +// (for the caller's later index-remapping). +export async function runPipelinedAnalysis({ + audioDurationSec, + waitForTime, + getReadySegments, + bodySeconds, + overlapSeconds, + cutoffSeconds, + concurrency, + onWindowComplete, + // Pass-through to analyzeOneWindow: + backend, + pipelineBackend, + jobId, + batchId, + mediaUrl, + title, + installId, + licenseFingerprint = null, + source, + computeCostDetails, + analyzePromptOverride = "", + targetTotalsByBucket = null, +}) { + const windows = planWindowsByDuration({ + totalAudioSec: audioDurationSec, + bodySeconds, + overlapSeconds, + cutoffSeconds, + }); + if (windows.length === 0) { + throw new Error( + "planWindowsByDuration produced no windows — audioDurationSec missing or zero" + ); + } + + const results = new Array(windows.length); + let nextIdx = 0; + async function worker() { + while (true) { + const my = nextIdx++; + if (my >= windows.length) return; + const w = windows[my]; + // Wait until transcribe has covered this window's full time + // range (including the overlap tail). + await waitForTime(w.windowEndSec); + // Extract segments in [w.startSec, w.windowEndSec). The caller + // is responsible for deduping at addChunk time so we don't + // see duplicates here. + const segments = getReadySegments(w.startSec, w.windowEndSec); + // Two-pass cleanup matching what hardware.js applies to the + // FULL stitched transcript: sortAndDedupeTranscript + + // mergeShortEntries collapses adjacent short Parakeet + // segments into thought-sized lines. + const rawBracketed = segmentsToBracketedText(segments); + const mergedBracketed = mergeShortEntries( + sortAndDedupeTranscript(rawBracketed) + ); + const windowEntries = parseBracketedTranscript(mergedBracketed); + // Diagnostic so we can see per-window readiness in operator + // logs WITHOUT depending on audit-row inspection. Reveals + // empty-window or huge-entry-count issues that cause silent + // analyzeOneWindow failures. + console.log( + `[pipelined-analyze] window ${my + 1}/${windows.length}: ${segments.length} segments → ${windowEntries.length} merged entries · time [${w.startSec}s-${w.windowEndSec}s]` + ); + const bodySec = Math.max(0, w.bodyEndSec - w.bodyStartSec); + const winResult = await analyzeOneWindow({ + windowEntries, + windowIdx: my, + windowCount: windows.length, + windowBodySec: bodySec, + windowStartSec: w.startSec, + windowEndSec: w.windowEndSec, + backend, + pipelineBackend, + jobId, + batchId, + mediaUrl, + title, + installId, + licenseFingerprint, + source, + computeCostDetails, + analyzePromptOverride, + targetTotalsByBucket, + totalAudioSec: audioDurationSec, + }); + results[my] = { window: w, windowEntries, ...winResult }; + // Per-window outcome log so operator sees what actually + // happened beyond "X/Y windows failed" rollup. Failure log + // includes the underlying error so we can diagnose JSON + // validation, model 4xx, network, etc. without grepping + // the audit DB. + if (winResult.ok) { + const dur = ((winResult.durationMs || 0) / 1000).toFixed(1); + console.log( + `[pipelined-analyze] window ${my + 1}/${windows.length} ok in ${dur}s — ${(winResult.sections || []).length} sections` + ); + } else { + const errMsg = winResult.error?.message || String(winResult.error || "unknown"); + console.warn( + `[pipelined-analyze] window ${my + 1}/${windows.length} FAILED — ${errMsg.slice(0, 280)}` + ); + } + // Emit per-window streaming event with WINDOW-LOCAL indices. + // The caller (summarize-url) will re-emit as a global-indexed + // event after the post-transcribe canonical-entries build. + // + // OWNED-SECTION FILTERING: each window's transcript range + // [startSec, windowEndSec] OVERLAPS BOTH the prior AND the + // next window. Without symmetric filtering, sections in the + // prior-overlap region (this window's view picked them up + // even though the prior window owns that body region) emit + // alongside the prior window's version → overlapping + // timestamps in the streaming UI. Symmetric filter: emit + // ONLY sections whose start time falls in this window's + // EXCLUSIVE body region [bodyStartSec, nextBodyStartSec). + // First window: bodyStartSec=0 → no lower bound. + // Last window: no next → no upper bound. The final stitcher + // still sees the FULL unfiltered section set (results[my] + // keeps everything), so any topic that crosses a body + // boundary and is missed by both adjacent windows in their + // body region still surfaces at result-event time. + const nextWindow = windows[my + 1]; + const ownedFloorSec = w.bodyStartSec || 0; + const ownedCutoffSec = nextWindow + ? nextWindow.bodyStartSec + : Infinity; + const ownedSections = winResult.ok + ? (winResult.sections || []).filter((s) => { + const startEntry = windowEntries[s.startIndex]; + if (!startEntry) return false; + const startSec = startEntry.offset || 0; + return startSec >= ownedFloorSec && startSec < ownedCutoffSec; + }) + : []; + if (onWindowComplete && winResult.ok) { + try { + onWindowComplete({ + windowIdx: my, + totalWindows: windows.length, + window: w, + windowEntries, + ownedSections, + windowBodySeconds: bodySec, + model: winResult.model || null, + durationMs: winResult.durationMs || 0, + }); + } catch (cbErr) { + console.warn( + `[pipelined-analyze] onWindowComplete callback failed: ${cbErr?.message || cbErr}` + ); + } + } + } + } + const workers = Array.from( + { length: Math.min(concurrency, windows.length) }, + worker + ); + await Promise.all(workers); + + const failed = results.filter((r) => r && !r.ok); + const dominantModel = (() => { + const tally = new Map(); + for (const r of results) { + if (!r || !r.ok || !r.model) continue; + tally.set(r.model, (tally.get(r.model) || 0) + 1); + } + return [...tally.entries()].sort((a, b) => b[1] - a[1])[0]?.[0] || null; + })(); + return { + windowResults: results, + dominantModel, + attempts: { windows: results.length, failed: failed.length }, + }; +} + +// Helper: convert an array of { start, end, text } segments into +// the bracketed [MM:SS] text format the analyze prompt + parser +// expect. Identical formatting to what hardware.js emits as its +// stitched transcript text. +function segmentsToBracketedText(segments) { + if (!segments || segments.length === 0) return ""; + // Sort by start in case caller passed unsorted (defensive). + const sorted = segments.slice().sort((a, b) => (a.start || 0) - (b.start || 0)); + return sorted + .map((s) => { + // Use H:MM:SS when the segment's time is in the 2nd hour or + // beyond. Previous code unconditionally used [MM:SS] which + // produced [106:30] for a 1:46:30 timestamp — 3-digit minutes + // are REJECTED by parseBracketedTranscript's regex + // (/^\[(\d{1,2}):(\d{2}).../), causing every transcript line + // past minute 100 to be silently dropped. Symptom on a 2h53m + // video: analyze windows 7-10 (covering 106+ min) all received + // 0 entries after the merge step and failed with "no entries — + // audio gap or pre-cutoff window". The H:MM:SS format matches + // what mergeShortEntries already emits and what the parser's + // optional 3rd capture group accepts. + const sec = Math.floor(s.start || 0); + const h = Math.floor(sec / 3600); + const m = Math.floor((sec % 3600) / 60); + const ss = sec % 60; + const pad = (n) => n.toString().padStart(2, "0"); + const stamp = h > 0 ? `${h}:${pad(m)}:${pad(ss)}` : `${m}:${pad(ss)}`; + return `[${stamp}] ${(s.text || "").trim()}`; + }) + .filter((l) => l.length > 0) + .join("\n"); +} diff --git a/server/job-stats.js b/server/job-stats.js new file mode 100644 index 0000000..ddc0833 --- /dev/null +++ b/server/job-stats.js @@ -0,0 +1,323 @@ +// Per-job aggregation over the relay's audit log. The audit log +// records ONE row per relay call (transcribe or analyze); a single +// summary job typically produces 1 transcribe row + N analyze rows +// (one per chunked-analyze window). This module groups those rows by +// X-Recap-Job-Id and computes per-video stats the dashboard renders +// as a sortable / filterable table. +// +// The aggregation is computed on the fly from the in-memory entries +// array — no separate persistence. A typical 30-day window has a few +// thousand audit rows; grouping is O(n) and well under 10 ms. +// +// Output row shape (one per job_id, plus a synthetic row for +// orphaned entries with no job_id): +// { +// job_id: string | null +// started_at: ms-epoch (earliest ts across the job's rows) +// completed_at: ms-epoch (latest ts) +// install_id: short string +// tier: "core" | "pro" | "max" | null +// media_url: string | null // from the transcribe row +// title: string | null // ditto +// audio_seconds: number | null // from transcribe row +// audio_bytes: number | null // ditto (bytes downloaded +// by the relay for transcribe-url) +// transcribe_status: "success" | "error" | "refused" | "missing" +// transcribe_backend: "gemini" | "hardware" | null +// transcribe_model: string | null +// transcribe_ms: number | null +// download_ms: number | null +// chunk_count: number | null // transcribe-side audio chunks +// analyze_windows_total: number // count of analyze rows +// analyze_windows_success: number +// analyze_windows_failed: number +// analyze_backend: string | null // dominant backend across analyze rows +// analyze_model: string | null // dominant model across analyze rows +// analyze_ms: number // sum of analyze duration_ms +// overall_status: "success" | "partial" | "failed" +// wall_time_ms: completed_at - started_at +// cost_usd: number (sum across all rows) +// errors: string[] // concatenated short error strings +// // Derived metrics — pre-computed so the UI can sort by them: +// transcribe_ms_per_min: number | null // transcribe_ms / (audio_seconds/60) +// transcribe_ms_per_mb: number | null // transcribe_ms / (audio_bytes / 1MB) +// download_ms_per_mb: number | null +// analyze_ms_per_min: number | null +// analyze_ms_per_mb: number | null +// } + +const MB = 1024 * 1024; + +export function aggregateJobs(entries, opts = {}) { + // Group by job_id. Entries without job_id become singleton groups + // keyed by their ts so they still appear in the table (helpful for + // debugging orphan calls). + const groups = new Map(); + for (const e of entries) { + const key = e.job_id || `_orphan_${e.ts}`; + if (!groups.has(key)) groups.set(key, []); + groups.get(key).push(e); + } + + // Optional set of job_ids that have stored output JSONs — passed + // in from the route layer so the aggregator doesn't have to hit + // the filesystem itself. Used to set the has_output flag the + // dashboard reads to show/hide the "View" link. + const outputIdSet = opts.outputIdSet instanceof Set ? opts.outputIdSet : null; + + const out = []; + for (const [key, rows] of groups) { + const row = aggregateOne(key, rows); + row.has_output = outputIdSet ? outputIdSet.has(row.job_id) : false; + out.push(row); + } + // Newest first by started_at. + out.sort((a, b) => b.started_at - a.started_at); + return out; +} + +function aggregateOne(key, rows) { + rows.sort((a, b) => a.ts - b.ts); + // ts in each audit row is when recordCall() fired — i.e., when the + // work for that row COMPLETED, not when it started. To recover the + // user-POV "job start" timestamp we work backwards from the first + // row's end-time using its duration_ms AND download_ms fields. + // + // TX row layout: + // ts = download_end + tx_work_end + // duration_ms = tx_work_duration (NOT including download) + // download_ms = download_duration + // + // So: job_start = ts - duration_ms - download_ms. + // + // Without including download_ms here, wall_time misses the + // download phase (which can be 30-60s on a long YouTube fetch). + // Including it makes wall_time match the operator's intuitive + // formula: WALL ≈ DL + TX + AN_wall. + const firstRowDur = Number(rows[0].duration_ms) || 0; + const firstRowDownload = Number(rows[0].download_ms) || 0; + const startedAt = rows[0].ts - firstRowDur - firstRowDownload; + const completedAt = rows[rows.length - 1].ts; + + const tx = rows.find((r) => r.pipeline === "transcribe"); + const analyzeRows = rows.filter((r) => r.pipeline === "analyze"); + + const analyzeSuccess = analyzeRows.filter((r) => r.status === "success"); + const analyzeFailed = analyzeRows.filter((r) => r.status !== "success"); + const analyzeMs = analyzeRows.reduce( + (s, r) => s + (Number(r.duration_ms) || 0), + 0 + ); + // Analyze wall time: elapsed clock time from when the FIRST window + // started to when the LAST window finished. For a 1-batch parallel + // analyze (all N windows fire concurrently), this ≈ the slowest + // single window's duration. For multi-batch (N > concurrency, e.g. + // 10 windows at concurrency 8 → 2 sequential batches), this spans + // both batches including any gap. Computed from end-ts minus + // start-ts (where start-ts = row.ts - row.duration_ms) so it's + // an accurate measured value, not a predicted one. + let analyzeWallMs = null; + if (analyzeRows.length > 0) { + let minStart = Infinity; + let maxEnd = -Infinity; + for (const r of analyzeRows) { + const end = Number(r.ts) || 0; + const dur = Number(r.duration_ms) || 0; + const start = end - dur; + if (start < minStart) minStart = start; + if (end > maxEnd) maxEnd = end; + } + analyzeWallMs = maxEnd - minStart; + } + const analyzeDominantBackend = dominant( + analyzeSuccess.map((r) => r.backend) + ); + const analyzeDominantModel = dominant(analyzeSuccess.map((r) => r.model)); + + const errors = rows + .filter((r) => r.error) + .map((r) => `${r.pipeline}: ${String(r.error).slice(0, 160)}`); + + const txStatus = tx ? tx.status : "missing"; + let overall; + if (txStatus === "error" || txStatus === "refused" || txStatus === "missing") { + overall = "failed"; + } else if (txStatus === "partial") { + // TX produced a truncated transcript (chunks hit the output-token + // cap). Mark the whole job partial regardless of analyze status — + // the analysis was performed against incomplete input, so even + // a "success" on analyze rows is misleading. + overall = "partial"; + } else if (analyzeRows.length === 0) { + // Transcribe succeeded but no analyze rows — could be in flight, + // or the client never called /relay/analyze (uses local model). + overall = "success"; + } else if (analyzeSuccess.length === analyzeRows.length) { + overall = "success"; + } else if (analyzeSuccess.length > 0) { + overall = "partial"; + } else { + overall = "failed"; + } + + const cost = rows.reduce((s, r) => s + (Number(r.cost_usd) || 0), 0); + + // Use `??` (nullish-coalesce) — NOT `||` — so a legitimate 0 isn't + // treated as missing data. The test-run worker writes duration_ms=0 + // historically (pre-fix) on cache-hit siblings; even though the new + // worker writes a non-zero shared wall-time, old audit rows from + // earlier benchmark batches still live in the NDJSON and we want + // those rendered correctly rather than collapsed to "—". + const audioSec = tx?.audio_seconds ?? null; + const audioBytes = tx?.audio_bytes ?? null; + const txMs = tx?.duration_ms ?? null; + const downloadMs = tx?.download_ms ?? null; + // TX backend compute time = sum of per-chunk wall-times. Distinct + // from txMs which is the outer parallel-fan-out wall-time. + // single-chunk: txMsSum ≈ txMs (one chunk, one duration) + // N-chunks at concurrency C: txMsSum ≈ N × per-chunk-duration + // txMs ≈ ⌈N/C⌉ × per-chunk-duration + // Falls back to txMs (the wall-time) when chunk_durations_ms is + // absent — old audit rows from before v0.2.41 don't have it. + const chunkDurationsArr = Array.isArray(tx?.chunk_durations_ms) ? tx.chunk_durations_ms : null; + const txMsSum = chunkDurationsArr + ? chunkDurationsArr.reduce((s, d) => s + (Number(d) || 0), 0) + : txMs; + + const audioMinutes = audioSec ? audioSec / 60 : null; + const audioMb = audioBytes ? audioBytes / MB : null; + + // batch_id and source are stamped per audit row by the test-run + // path; use the first non-null we see so dashboard filters work + // regardless of which row gets read first in a multi-row job. + const batchId = rows.find((r) => r.batch_id)?.batch_id || null; + const source = rows.find((r) => r.source)?.source || null; + + return { + job_id: key.startsWith("_orphan_") ? null : key, + started_at: startedAt, + completed_at: completedAt, + install_id: tx?.install_id || rows[0].install_id || null, + tier: tx?.tier || rows[0].tier || null, + media_url: tx?.media_url || null, + title: tx?.title || null, + batch_id: batchId, + source: source, + audio_seconds: audioSec, + audio_bytes: audioBytes, + transcribe_status: txStatus, + transcribe_backend: tx?.backend || null, + transcribe_model: tx?.model || null, + // transcribe_ms = outer wall-time of the whole TX phase (the + // value the operator perceives as "how long did transcribe + // take"). transcribe_ms_sum = total backend compute across all + // chunks (drives cost; equals N × wall when N chunks run truly + // sequentially, equals wall when single-chunk). For Gemini at + // concurrency 12 over 3 chunks: wall ≈ 60s, sum ≈ 180s. + transcribe_ms: txMs, + transcribe_ms_sum: txMsSum, + download_ms: downloadMs, + chunk_count: tx?.chunk_count ?? null, + analyze_windows_total: analyzeRows.length, + analyze_windows_success: analyzeSuccess.length, + analyze_windows_failed: analyzeFailed.length, + analyze_backend: analyzeDominantBackend, + analyze_model: analyzeDominantModel, + // analyze_ms = SUM of per-window durations (total backend compute, + // useful for cost). analyze_wall_time_ms = ELAPSED time from + // first window start to last window end (the time a user actually + // waits for the analyze phase). The two diverge when N windows + // run in parallel: a 10-window 100s-per-window job has analyze_ms + // = 1000s but analyze_wall_time_ms ≈ 100s (single batch) or + // ≈ 200s (two sequential batches at concurrency 5). + analyze_ms: analyzeMs, + analyze_wall_time_ms: analyzeWallMs, + overall_status: overall, + wall_time_ms: completedAt - startedAt, + cost_usd: cost, + errors, + // Derived rate metrics: + transcribe_ms_per_min: audioMinutes && txMs ? txMs / audioMinutes : null, + transcribe_ms_per_mb: audioMb && txMs ? txMs / audioMb : null, + download_ms_per_mb: audioMb && downloadMs ? downloadMs / audioMb : null, + analyze_ms_per_min: audioMinutes && analyzeMs ? analyzeMs / audioMinutes : null, + analyze_wall_ms_per_min: audioMinutes && analyzeWallMs ? analyzeWallMs / audioMinutes : null, + analyze_ms_per_mb: audioMb && analyzeMs ? analyzeMs / audioMb : null, + }; +} + +// Pick the most frequent string in a list (ties broken by first +// occurrence). Used to attribute a backend/model to a job when its +// rows might disagree (e.g. some analyze windows hit gemini and +// fallback chain walked to hardware on others). +function dominant(values) { + const counts = new Map(); + for (const v of values) { + if (!v) continue; + counts.set(v, (counts.get(v) || 0) + 1); + } + let best = null; + let bestCount = 0; + for (const [v, c] of counts) { + if (c > bestCount) { + best = v; + bestCount = c; + } + } + return best; +} + +// Compute summary statistics across all aggregated jobs. Returned to +// the dashboard's top-of-page cards: success rate, total processing +// time, average wall-time per video, etc. +export function summarizeJobs(jobs) { + const total = jobs.length; + if (total === 0) { + return { + total: 0, + success: 0, + partial: 0, + failed: 0, + success_rate: 1, + median_wall_time_ms: null, + median_transcribe_ms_per_min: null, + median_analyze_ms_per_min: null, + total_cost_usd: 0, + total_audio_hours: 0, + }; + } + const success = jobs.filter((j) => j.overall_status === "success").length; + const partial = jobs.filter((j) => j.overall_status === "partial").length; + const failed = jobs.filter((j) => j.overall_status === "failed").length; + const totalCost = jobs.reduce((s, j) => s + (j.cost_usd || 0), 0); + const totalAudioSec = jobs.reduce( + (s, j) => s + (j.audio_seconds || 0), + 0 + ); + + return { + total, + success, + partial, + failed, + success_rate: (success + partial) / total, + median_wall_time_ms: median(jobs.map((j) => j.wall_time_ms).filter(Number.isFinite)), + median_transcribe_ms_per_min: median( + jobs.map((j) => j.transcribe_ms_per_min).filter(Number.isFinite) + ), + median_analyze_ms_per_min: median( + jobs.map((j) => j.analyze_ms_per_min).filter(Number.isFinite) + ), + total_cost_usd: totalCost, + total_audio_hours: totalAudioSec / 3600, + }; +} + +function median(arr) { + if (!arr.length) return null; + const sorted = [...arr].sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 === 0 + ? (sorted[mid - 1] + sorted[mid]) / 2 + : sorted[mid]; +} diff --git a/server/jobs.js b/server/jobs.js new file mode 100644 index 0000000..7ef5d61 --- /dev/null +++ b/server/jobs.js @@ -0,0 +1,219 @@ +// In-memory background-job tracker. Used by /relay/transcribe-url +// (and any future long-running endpoint) so the request that kicks +// off the work returns immediately with a job_id, and the client +// polls /relay/jobs/{id} to find out when it's done. +// +// Rationale: synchronous HTTP responses for multi-minute transcribes +// are fragile. Any intermediate proxy / load balancer / NAT in the +// path will drop the connection after some idle/total timeout (often +// 100s–10min), failing the whole job mid-flight even though the +// relay backend is working fine. Async jobs sidestep all of that: +// the long-running work happens off the request path and the client +// polls short, cheap requests until done. +// +// Storage is in-process memory. Restart-survivability is a known +// gap — a relay restart mid-job loses that job's state, and the +// client will re-poll forever until it gives up. Acceptable for v1 +// at small relay scale; the audit log already captures every +// completed call so the operator has a paper trail either way. +// Migrate to SQLite if/when restart-resilience becomes important. +// +// Each job is { id, kind, install_id, status, started_at, updated_at, +// completed_at?, progress?, result?, error? } +// status: "queued" | "running" | "complete" | "failed" + +import { randomUUID } from "crypto"; +import { sanitizeErrorForClient } from "./sanitize-error.js"; + +// All in-memory; lost on restart. +const jobs = new Map(); + +// Cap how long completed jobs hang around so the map doesn't grow +// unbounded. Once a client has polled and seen "complete", it'll +// stop polling — keeping the record 24h gives slow / retried clients +// a generous window without exhausting memory. +const RETENTION_MS = 24 * 60 * 60 * 1000; + +export function createJob({ kind, installId, metadata = {} }) { + pruneExpired(); + const id = randomUUID(); + const now = Date.now(); + const job = { + id, + kind, + install_id: installId, + status: "queued", + started_at: now, + updated_at: now, + completed_at: null, + progress: null, + result: null, + error: null, + metadata, + // Event log + live subscriber list. Used by jobs that stream + // incremental results via SSE (e.g., /relay/summarize-url + // dispatches transcribe_progress, transcribe_complete, + // window_complete, done, error events). Each event is + // { type, data, ts } and gets BOTH appended to the log (so a + // late SSE-connecting client can replay missed events) and + // pushed to any currently-subscribed callbacks. `subscribers` + // is intentionally non-enumerable / non-serialized so it never + // leaks into snapshotJobs() or HTTP responses. + events: [], + }; + Object.defineProperty(job, "subscribers", { + value: new Set(), + enumerable: false, + writable: false, + }); + jobs.set(id, job); + return job; +} + +// Append an event to a job's log AND notify any live SSE +// subscribers. Used by /relay/summarize-url's background worker to +// emit per-window progress as it streams in from runChunkedAnalysis. +// Event shape: +// { type: "window_complete"|"transcribe_complete"|"done"|"error"|"progress", +// data: , +// ts: ms-epoch } +// Subscriber callbacks receive ONLY the new event (not the full log); +// new subscribers should replay the log themselves on connect. +export function appendEvent(jobId, type, data) { + const job = jobs.get(jobId); + if (!job) return; + const event = { type, data, ts: Date.now() }; + job.events.push(event); + job.updated_at = event.ts; + // Cap the log so a runaway job doesn't blow memory. 1000 events + // is far beyond any plausible window count (typical: 10-20). + if (job.events.length > 1000) job.events.shift(); + for (const cb of job.subscribers) { + try { + cb(event); + } catch (err) { + console.warn(`[jobs] subscriber callback failed: ${err?.message || err}`); + } + } +} + +// Subscribe to live events from a job. Returns an unsubscribe +// function the caller MUST call (e.g., on SSE connection close) +// or the job state will leak the callback closure forever. +// Returns null when the job no longer exists. +export function subscribeToJob(jobId, callback) { + const job = jobs.get(jobId); + if (!job) return null; + job.subscribers.add(callback); + return () => { + job.subscribers.delete(callback); + }; +} + +export function getJob(jobId) { + pruneExpired(); + return jobs.get(jobId) || null; +} + +export function markRunning(jobId) { + const job = jobs.get(jobId); + if (!job) return; + job.status = "running"; + job.updated_at = Date.now(); +} + +export function setProgress(jobId, message) { + const job = jobs.get(jobId); + if (!job) return; + job.progress = String(message).slice(0, 200); + job.updated_at = Date.now(); +} + +export function markComplete(jobId, envelope) { + const job = jobs.get(jobId); + if (!job) return; + job.status = "complete"; + // Keep the full envelope shape on the job (caller decides what to + // pass — typically { result: {...inner...}, credit_charged, tier }). + // Internal consumers that read job.result directly still see the + // wrapped form. + job.result = envelope; + job.completed_at = Date.now(); + job.updated_at = job.completed_at; + // SSE "done" event: emit the INNER result directly so subscribers + // can read fields off `data.result.title` (or `.transcript`, + // `.analysis`, etc.) instead of a confusing `data.result.result.title`. + // The wrapped form (envelope.result) is unwrapped here; if the + // caller passed a flat result without an inner `.result` key we + // just pass it through unchanged. credit_charged + tier travel + // alongside as siblings so the SSE consumer can update its + // balance display without digging into the result body. + // + // Why this matters: Recap-app's SSE handler does + // `finalResult = data.result`, then reads `finalResult.title`. + // Before this fix, that landed on the wrapping envelope and every + // title came back undefined — library entries persisted as + // "Untitled" despite the relay correctly extracting the real title + // via yt-dlp. The audit log was unaffected (it reads the local + // `title` variable directly) which made the bug look like a + // Recap-side issue. It wasn't. + const inner = envelope && typeof envelope === "object" && "result" in envelope + ? envelope.result + : envelope; + appendEvent(jobId, "done", { + result: inner, + credit_charged: envelope?.credit_charged, + tier: envelope?.tier, + }); +} + +export function markFailed(jobId, errorMessage) { + const job = jobs.get(jobId); + if (!job) return; + job.status = "failed"; + // Sanitize at the source so EVERY downstream surface that reads + // job.error (SSE error event, the per-job GET endpoints, etc.) + // gets the client-safe wording, without having to remember to + // sanitize at every call site. The raw operator-internal message + // stays available on job.error_internal for the admin dashboard + + // audit log (snapshotJobs exposes both fields). + const raw = String(errorMessage || "unknown error").slice(0, 500); + job.error_internal = raw; + job.error = sanitizeErrorForClient(raw).slice(0, 500); + job.completed_at = Date.now(); + job.updated_at = job.completed_at; + // Same terminal event for failures — SSE clients close on this + // and surface the error to the user. + appendEvent(jobId, "error", { error: job.error }); +} + +export function snapshotJobs() { + pruneExpired(); + return Array.from(jobs.values()).map((j) => ({ + id: j.id, + kind: j.kind, + install_id: j.install_id, + status: j.status, + started_at: j.started_at, + updated_at: j.updated_at, + completed_at: j.completed_at, + progress: j.progress, + has_result: j.result != null, + // Both error variants exposed — the admin dashboard consumes + // snapshotJobs and can prefer error_internal for operator + // diagnosis (full backend / spark-control wording intact). + // External callers should always read `error` (sanitized). + error: j.error, + error_internal: j.error_internal || j.error, + })); +} + +function pruneExpired() { + const cutoff = Date.now() - RETENTION_MS; + for (const [id, job] of jobs) { + const ref = job.completed_at || job.updated_at || job.started_at; + if (ref && ref < cutoff) { + jobs.delete(id); + } + } +} diff --git a/server/meeting-extras.js b/server/meeting-extras.js new file mode 100644 index 0000000..cfc5c90 --- /dev/null +++ b/server/meeting-extras.js @@ -0,0 +1,376 @@ +// Phase 2 of Path 2A — meeting extras analysis. +// +// Runs a single LLM pass AFTER transcribe → diarize → cluster → +// analyze → name-inference → summary-polish complete. Pulls out four +// categories of structured information that operators consistently +// want at the top of an internal meeting recap: +// +// - decisions : what was agreed on (with the offset where it was settled) +// - action_items : who owes what, by when (best-effort due_hint) +// - open_questions : questions raised that didn't get resolved +// - key_quotes : notable statements worth surfacing verbatim +// +// Each item carries a `supporting_offset` (or `offset`) in seconds so +// the dashboard can render the timestamp as a clickable jump to the +// corresponding transcript line. Each item also carries speaker IDs +// (cluster ids like Speaker_A) so the renderer can show the speaker's +// colored chip + display name, and so an operator-rename or per-line +// override propagates here too. +// +// Returns: +// { +// decisions: [{ statement, agreed_by[], supporting_offset }], +// action_items: [{ description, owner, due_hint, supporting_offset }], +// open_questions: [{ question, raised_by, answered }], +// key_quotes: [{ speaker, offset, quote, why_notable }], +// } +// +// or null on total failure. Failure is non-fatal — the meeting still +// saves with rec.extras = null and the dashboard just hides the +// extras section. + +import { recordCall } from "./audit-log.js"; + +const EXTRAS_MAX_ATTEMPTS = 3; + +export const DEFAULT_MEETING_EXTRAS_PROMPT_TEMPLATE = `You are extracting structured information from an internal team meeting transcript. The transcript below has been pre-tagged with speaker labels like [A], [B], [C] (anonymous voice-clustering labels) and inferred real names where available. + +MEETING METADATA: +- Title: {{title}} +- Duration: {{duration}} + +{{operatorContext}}SPEAKERS (from voice clustering, with operator-confirmed names where present): +{{speakerRoster}} + +TOPIC SUMMARIES (already produced — for context only, do not duplicate): +{{topics}} + +TRANSCRIPT (each line is "[ ] text"): +{{transcript}} + +INSTRUCTIONS: +Extract FIVE categories of information from the meeting. Return EMPTY ARRAYS for categories that don't apply — do NOT invent items. + +1. TLDR — A 2-4 sentence executive summary of the entire meeting: what it was about, the key discussion arc, and the bottom-line outcome. Write in past tense, third person. Keep it dense — every clause should carry information. Skip pleasantries and procedural opening/closing chatter. If a meeting was genuinely substanceless (a 3-minute check-in, audio test, etc.), write one factual descriptor sentence instead of padding. This is the only required category — even the most trivial meeting gets a one-sentence TLDR. + - summary: the 2-4 sentence executive summary + - primary_speakers: array of Speaker_X ids who drove the conversation (the 1-3 people most central to the discussion, in rough order of contribution). Empty array if unclear. + +2. DECISIONS — Things explicitly decided / agreed during the meeting. Include only clear commitments ("we will do X", "let's go with Y"), not casual mentions. For each: + - statement: the decision in one sentence + - agreed_by: array of Speaker_X ids who explicitly agreed (use the chip-letter notation, e.g. ["Speaker_A", "Speaker_C"]). Empty array if unclear. + - supporting_offset: integer SECONDS where this decision was made (use the [ ] timestamp from the most relevant transcript line — convert MM:SS to total seconds) + +3. ACTION_ITEMS — Specific commitments where someone said they would do something. Include only explicit ownership ("I'll send the doc", "Matt will follow up"), not vague "someone should...". For each: + - description: the action in imperative form + - owner: the Speaker_X id of the person taking it on (e.g. "Speaker_A"), or null if unclear + - due_hint: the deadline as a string if mentioned ("by Friday", "end of week", "before next call"), or null + - supporting_offset: integer seconds where the commitment was made + +4. OPEN_QUESTIONS — Questions raised that were NOT clearly answered during the meeting. Skip rhetorical questions and questions that got direct answers. For each: + - question: the question, rephrased to be self-contained + - raised_by: the Speaker_X id who asked (or null if unclear) + - answered: false (always — if it was answered, don't include it) + +5. KEY_QUOTES — Statements worth surfacing verbatim because they are pivotal, particularly insightful, or capture a strong opinion. Limit to 3-6 quotes max. Skip filler and conversational text. For each: + - speaker: the Speaker_X id of the speaker + - offset: integer seconds where the quote occurs + - quote: the verbatim quote (trim to the substantive sentence, 4-30 words) + - why_notable: one short clause on why this is worth surfacing + +Be conservative across all five. Better to return an empty array (or for TLDR, a single factual sentence) than to fabricate. A 5-minute small-talk call may legitimately have 0 decisions, 0 action items, 0 open questions, 0 key quotes — but it still gets a TLDR. + +Respond with ONLY valid JSON in this exact shape, no other text: +{ + "tldr": {"summary": "...", "primary_speakers": ["Speaker_A", "Speaker_B"]}, + "decisions": [{"statement": "...", "agreed_by": ["Speaker_A"], "supporting_offset": 123}], + "action_items": [{"description": "...", "owner": "Speaker_B", "due_hint": "by Friday", "supporting_offset": 234}], + "open_questions": [{"question": "...", "raised_by": "Speaker_C", "answered": false}], + "key_quotes": [{"speaker": "Speaker_A", "offset": 345, "quote": "...", "why_notable": "..."}] +}`; + +function fillTemplate(template, vars) { + return String(template || "").replace(/\{\{\s*(\w+)\s*\}\}/g, (_match, key) => { + return key in vars ? String(vars[key]) : `{{${key}}}`; + }); +} + +function formatDuration(seconds) { + const s = Math.max(0, Math.floor(seconds || 0)); + const h = Math.floor(s / 3600); + const m = Math.floor((s % 3600) / 60); + const sec = s % 60; + if (h > 0) return `${h}h ${m}m ${sec}s`; + if (m > 0) return `${m}m ${sec}s`; + return `${sec}s`; +} + +function formatLabeledTranscript(segments) { + if (!Array.isArray(segments) || segments.length === 0) return ""; + const lines = []; + for (const seg of segments) { + const text = (seg.text || "").trim(); + if (!text) continue; + const t = seg.start || 0; + let letter = "?"; + const m = String(seg.speaker || "").match(/^Speaker_([A-Z]+)$/); + if (m) letter = m[1]; + const secInt = Math.floor(t); + const mm = Math.floor(secInt / 60); + const ss = secInt % 60; + lines.push(`[${letter} ${mm}:${String(ss).padStart(2, "0")}] ${text}`); + } + return lines.join("\n"); +} + +// Trim a too-large transcript by keeping the head and tail. Keeps +// the meeting's opening (introductions, agenda) AND closing (wrap-up, +// next steps) which are where most extras-worthy content lives. +function capTranscript(text, maxChars) { + if (text.length <= maxChars) return text; + const half = Math.floor(maxChars / 2) - 50; + return ( + text.slice(0, half) + + "\n\n…[middle truncated for prompt length]…\n\n" + + text.slice(-half) + ); +} + +function safeParseExtras(text) { + if (!text || typeof text !== "string") return null; + let s = text.trim(); + const fence = s.match(/```(?:json)?\s*([\s\S]*?)```/); + if (fence) s = fence[1].trim(); + let parsed; + try { + parsed = JSON.parse(s); + } catch { + return null; + } + if (!parsed || typeof parsed !== "object") return null; + const asArray = (v) => (Array.isArray(v) ? v : []); + // TLDR — exactly one object (not an array). Required category; + // we accept any well-formed shape and clamp to safe bounds. If + // the LLM omitted it entirely we leave it null so the renderer + // can show "TLDR unavailable" rather than fabricating. + let tldr = null; + if (parsed.tldr && typeof parsed.tldr === "object" && !Array.isArray(parsed.tldr)) { + const summary = typeof parsed.tldr.summary === "string" ? parsed.tldr.summary.trim() : ""; + if (summary) { + tldr = { + summary: summary.slice(0, 800), + primary_speakers: Array.isArray(parsed.tldr.primary_speakers) + ? parsed.tldr.primary_speakers + .filter((x) => typeof x === "string" && /^Speaker_[A-Z]+$/.test(x)) + .slice(0, 5) + : [], + }; + } + } + // Coerce + clamp each category to a sane shape. Drop entries + // that fail validation rather than failing the whole pass. + const decisions = asArray(parsed.decisions) + .map((d) => { + if (!d || typeof d !== "object") return null; + const statement = typeof d.statement === "string" ? d.statement.trim() : ""; + if (!statement) return null; + return { + statement: statement.slice(0, 400), + agreed_by: Array.isArray(d.agreed_by) + ? d.agreed_by.filter((x) => typeof x === "string" && /^Speaker_[A-Z]+$/.test(x)).slice(0, 10) + : [], + supporting_offset: Number.isFinite(d.supporting_offset) ? Math.max(0, Math.floor(d.supporting_offset)) : null, + }; + }) + .filter(Boolean) + .slice(0, 20); + const action_items = asArray(parsed.action_items) + .map((a) => { + if (!a || typeof a !== "object") return null; + const description = typeof a.description === "string" ? a.description.trim() : ""; + if (!description) return null; + return { + description: description.slice(0, 400), + owner: typeof a.owner === "string" && /^Speaker_[A-Z]+$/.test(a.owner) ? a.owner : null, + due_hint: typeof a.due_hint === "string" && a.due_hint.trim() ? a.due_hint.trim().slice(0, 80) : null, + supporting_offset: Number.isFinite(a.supporting_offset) ? Math.max(0, Math.floor(a.supporting_offset)) : null, + }; + }) + .filter(Boolean) + .slice(0, 30); + const open_questions = asArray(parsed.open_questions) + .map((q) => { + if (!q || typeof q !== "object") return null; + const question = typeof q.question === "string" ? q.question.trim() : ""; + if (!question) return null; + return { + question: question.slice(0, 400), + raised_by: typeof q.raised_by === "string" && /^Speaker_[A-Z]+$/.test(q.raised_by) ? q.raised_by : null, + answered: q.answered === true, + }; + }) + .filter(Boolean) + .slice(0, 20); + const key_quotes = asArray(parsed.key_quotes) + .map((q) => { + if (!q || typeof q !== "object") return null; + const quote = typeof q.quote === "string" ? q.quote.trim() : ""; + if (!quote) return null; + return { + speaker: typeof q.speaker === "string" && /^Speaker_[A-Z]+$/.test(q.speaker) ? q.speaker : null, + offset: Number.isFinite(q.offset) ? Math.max(0, Math.floor(q.offset)) : null, + quote: quote.slice(0, 400), + why_notable: typeof q.why_notable === "string" ? q.why_notable.trim().slice(0, 200) : "", + }; + }) + .filter(Boolean) + .slice(0, 10); + return { tldr, decisions, action_items, open_questions, key_quotes }; +} + +export async function runMeetingExtras({ + title, + audioSec, + speakers, + speakerNames, + transcriptSegments, + topics, // array of { title, summary, startTime } from analyze-then-polish + promptOverride = "", + // Operator-supplied hints (internal meetings only). participantHints + // is a CSV-ish string of expected attendees; operatorNotes is free- + // form prose describing who-said-what. Both are framed as hints in + // the rendered prompt — the LLM is instructed to use them as soft + // signals and verify against the transcript before quoting or + // attributing. Empty → no OPERATOR HINTS block appears. + participantHints = "", + operatorNotes = "", + backend, + pipelineBackend, + jobId, + installId, + licenseFingerprint = null, + source, + computeCostDetails, +}) { + if (!backend) return null; + if (!Array.isArray(transcriptSegments) || transcriptSegments.length === 0) return null; + + // Build speaker roster — Speaker_A (chip A, 12m 34s, "Matt Hill") + const speakerLetters = Object.keys(speakers || {}) + .filter((k) => /^Speaker_[A-Z]+$/.test(k)) + .sort(); + const speakerRoster = speakerLetters + .map((k) => { + const stats = speakers[k] || {}; + const secs = Math.round(stats.total_speaking_seconds || 0); + const mins = Math.floor(secs / 60); + const rem = secs % 60; + const timeStr = mins > 0 ? `${mins}m ${rem}s` : `${rem}s`; + const letter = k.replace("Speaker_", ""); + const name = speakerNames && speakerNames[k] ? `"${speakerNames[k]}"` : "(unknown)"; + return `- ${k} (chip [${letter}], ${timeStr} speaking, ${stats.turns || 0} turns): ${name}`; + }) + .join("\n"); + + const topicsBlock = Array.isArray(topics) && topics.length + ? topics + .map((t, i) => { + const startSec = t.startTime || 0; + const mm = Math.floor(startSec / 60); + const ss = Math.floor(startSec % 60); + const tStr = `${mm}:${String(ss).padStart(2, "0")}`; + return `${i + 1}. [${tStr}] ${t.title || "(untitled)"} — ${t.summary || ""}`; + }) + .join("\n") + : "(no topics)"; + + const fullTranscript = formatLabeledTranscript(transcriptSegments); + const cappedTranscript = capTranscript(fullTranscript, 25000); + + // Compose the OPERATOR HINTS block — same shape as the name- + // inference pipeline so the LLM gets consistent framing across + // both passes. Empty when no hints supplied. + const hintsParts = []; + if (participantHints && String(participantHints).trim()) { + hintsParts.push( + `Possible participants in this meeting (operator-supplied — may be incomplete):\n${String(participantHints).trim()}`, + ); + } + if (operatorNotes && String(operatorNotes).trim()) { + const trimmed = String(operatorNotes).trim().slice(0, 4000); + hintsParts.push( + `Operator notes (may describe who said what — use as soft context, verify against the transcript before extracting decisions / action items / quotes):\n${trimmed}`, + ); + } + const operatorContextBlock = hintsParts.length + ? `OPERATOR HINTS (treat as suggestions only — verify against the transcript):\n\n${hintsParts.join("\n\n")}\n\n` + : ""; + + const templateSource = + typeof promptOverride === "string" && promptOverride.trim() + ? promptOverride + : DEFAULT_MEETING_EXTRAS_PROMPT_TEMPLATE; + const prompt = fillTemplate(templateSource, { + title: title || "(untitled)", + duration: formatDuration(audioSec), + operatorContext: operatorContextBlock, + speakerRoster: speakerRoster || "(no speakers identified)", + topics: topicsBlock, + transcript: cappedTranscript || "(empty)", + }); + + const t0 = Date.now(); + let r = null; + let parsed = null; + let lastErr = null; + for (let attempt = 0; attempt < EXTRAS_MAX_ATTEMPTS; attempt++) { + try { + r = await backend.analyzeText({ prompt }); + parsed = safeParseExtras(r.text); + if (parsed) { + lastErr = null; + break; + } + lastErr = "invalid JSON in extras response"; + } catch (err) { + lastErr = (err?.message || String(err)).slice(0, 280); + r = null; + } + if (attempt < EXTRAS_MAX_ATTEMPTS - 1) { + console.warn( + `[meeting-extras] attempt ${attempt + 1} failed (${lastErr}) — retrying` + ); + } + } + const dur = Date.now() - t0; + const cost = + parsed && r + ? computeCostDetails(r.model, r.usage) + : { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 }; + await recordCall({ + install_id: installId, + license_fingerprint: licenseFingerprint, + tier: "core", + pipeline: "meeting_extras", + backend: pipelineBackend, + model: r?.model || null, + status: parsed ? "success" : "error", + duration_ms: dur, + audio_seconds: 0, + job_id: jobId, + batch_id: null, + source, + media_url: null, + error: parsed ? null : lastErr || "extras analysis failed", + ...cost, + }); + if (!parsed) { + console.warn( + `[meeting-extras] all ${EXTRAS_MAX_ATTEMPTS} attempts failed (${lastErr}) — extras unavailable` + ); + return null; + } + console.log( + `[meeting-extras] extracted ${parsed.tldr ? "tldr + " : "(no tldr) + "}${parsed.decisions.length} decision(s), ${parsed.action_items.length} action(s), ${parsed.open_questions.length} question(s), ${parsed.key_quotes.length} quote(s) in ${(dur / 1000).toFixed(1)}s` + ); + return parsed; +} diff --git a/server/meeting-speaker-edits.js b/server/meeting-speaker-edits.js new file mode 100644 index 0000000..0d446f8 --- /dev/null +++ b/server/meeting-speaker-edits.js @@ -0,0 +1,359 @@ +// Post-hoc speaker edits for saved internal meetings. +// +// Two operator tools that mutate a saved meeting record in place, +// without re-uploading audio or hitting Spark Control: +// +// mergeSpeakersInRecord — fold one or more clusters that diarization +// mistakenly split apart into a single speaker. +// reclusterMeetingRecord — re-run the cross-chunk voice clustering at a +// new strictness threshold to separate two +// people who were over-merged into one cluster. +// Pure offline re-clustering off the persisted +// per-chunk fingerprints (rec.diarization). +// +// Both must keep the FOUR places a speaker label lives in sync: +// 1. rec.transcript_segments[].speaker +// 2. rec.chunks[].entries[].speaker (+ .speaker_override) +// 3. rec.speakers (per-cluster stats map) +// 4. rec.extras (tldr.primary_speakers, decisions.agreed_by, +// action_items.owner, key_quotes.speaker) +// plus rec.speaker_names (display-name map). + +import { + clusterSpeakers, + assignSpeakersToSegments, +} from "./speaker-clustering.js"; + +// ─── Entry speaker backfill ───────────────────────────────────────── +// Re-derive each chunk entry's speaker from rec.transcript_segments by +// timestamp. Used (a) on load to repair pre-diarization records and +// (b) after a re-cluster re-stamps the segments. By default it only +// fills entries that LACK a speaker (the load-path use); pass +// { force: true } to re-stamp every entry (the re-cluster use, after +// the old labels have been cleared). +// +// Matching mirrors the pipeline's original offset→segment logic +// (internal-meetings.js build path): exact floored-start, then a +// containing segment within ±0.5s, then nearest preceding within 5s. +export function backfillEntrySpeakers(rec, { force = false } = {}) { + if (!rec || !Array.isArray(rec.chunks) || !Array.isArray(rec.transcript_segments)) { + return; + } + if (!force) { + const needsBackfill = rec.chunks.some((c) => + Array.isArray(c.entries) && c.entries.some((e) => !e || !e.speaker) + ); + if (!needsBackfill) return; + } + + const segs = rec.transcript_segments + .slice() + .sort((a, b) => (a.start || 0) - (b.start || 0)); + const byFlooredStart = new Map(); + for (const seg of segs) { + const k = Math.floor(seg.start || 0); + if (!byFlooredStart.has(k)) byFlooredStart.set(k, seg); + } + const pickSpeaker = (t) => { + let found = byFlooredStart.get(t); + if (found && found.speaker) return found; + for (const seg of segs) { + if ((seg.start || 0) > t + 5) break; + if ((seg.start || 0) - 0.5 <= t && t <= (seg.end || 0) + 0.5) { + if (seg.speaker) return seg; + } + } + let bestPrev = null; + let bestDist = Infinity; + for (const seg of segs) { + if ((seg.start || 0) > t) break; + const dist = t - (seg.start || 0); + if (dist < bestDist && seg.speaker) { + bestDist = dist; + bestPrev = seg; + } + } + if (bestPrev && bestDist <= 5) return bestPrev; + return null; + }; + for (const chunk of rec.chunks) { + if (!Array.isArray(chunk.entries)) continue; + for (const entry of chunk.entries) { + if (!force && entry.speaker) continue; + const t = entry.offset || 0; + const found = pickSpeaker(t); + if (found && found.speaker) { + entry.speaker = found.speaker; + entry.speaker_confidence = found.speaker_confidence ?? null; + entry.speaker_uncertain = !!found.speaker_uncertain; + } + } + } +} + +// ─── Merge speakers ───────────────────────────────────────────────── +// Fold each cluster in `absorbed` into `survivor`. Rewrites every label +// reference, sums the stats, inherits the absorbed display name only +// when the survivor has none, and rewrites extras attributions. +// Remaining letters are intentionally NOT renumbered — that would +// cascade through speaker_names + per-line overrides for no real gain. +// +// Returns { changed, speakers, speaker_names }. Throws on invalid input. +export function mergeSpeakersInRecord(rec, survivor, absorbed) { + if (!rec || typeof rec !== "object") { + throw badRequest("record required"); + } + const speakers = rec.speakers && typeof rec.speakers === "object" ? rec.speakers : {}; + const absorbList = Array.isArray(absorbed) ? [...new Set(absorbed)] : []; + + if (typeof survivor !== "string" || !speakers[survivor]) { + throw badRequest("survivor must be an existing speaker id"); + } + if (absorbList.length === 0) { + throw badRequest("absorbed must list at least one speaker id"); + } + for (const x of absorbList) { + if (x === survivor) throw badRequest("cannot merge a speaker into itself"); + if (!speakers[x]) throw badRequest(`unknown speaker id: ${x}`); + } + // Refuse if the merge would leave no named-able speakers — i.e. it + // collapses everything into one is fine, but survivor must remain. + const remaining = Object.keys(speakers).filter((id) => !absorbList.includes(id)); + if (!remaining.includes(survivor)) { + throw badRequest("survivor cannot be in the absorbed set"); + } + + const absorbedSet = new Set(absorbList); + let changed = 0; + + // 1. transcript_segments + for (const seg of rec.transcript_segments || []) { + if (seg && absorbedSet.has(seg.speaker)) { + seg.speaker = survivor; + changed += 1; + } + } + + // 2. chunk entries (+ per-line overrides) + for (const chunk of rec.chunks || []) { + for (const entry of chunk.entries || []) { + if (!entry) continue; + if (absorbedSet.has(entry.speaker)) { + entry.speaker = survivor; + changed += 1; + } + if (absorbedSet.has(entry.speaker_override)) { + entry.speaker_override = survivor; + changed += 1; + } + } + } + + // 3. stats + display name + rec.speaker_names = rec.speaker_names && typeof rec.speaker_names === "object" + ? rec.speaker_names + : {}; + for (const x of absorbList) { + mergeStats(speakers[survivor], speakers[x]); + delete speakers[x]; + // Survivor inherits the absorbed name only if it has none of its own. + if (!rec.speaker_names[survivor] && rec.speaker_names[x]) { + rec.speaker_names[survivor] = rec.speaker_names[x]; + } + if (x in rec.speaker_names) delete rec.speaker_names[x]; + } + + // 4. extras attributions + remapExtrasSpeakers(rec.extras, (id) => (absorbedSet.has(id) ? survivor : id)); + + rec.meta = rec.meta || {}; + rec.meta.speakers_merged_at = Date.now(); + + return { changed, speakers: rec.speakers, speaker_names: rec.speaker_names }; +} + +// ─── Re-cluster (re-run diarization) ──────────────────────────────── +// Re-run cross-chunk clustering off the persisted per-chunk +// fingerprints at a new threshold (+ optional suppression knobs), +// re-stamp every segment + entry, then RESET the now-stale attribution +// data (inferred names, per-line overrides, extras speaker tags) so the +// operator re-labels from a clean slate. No LLM calls. +// +// Returns { speakers, clusterCount, threshold }. Throws a NO_FINGERPRINTS +// error (code on err) when the record has no usable fingerprint data. +export function reclusterMeetingRecord(rec, opts = {}) { + if (!rec || typeof rec !== "object") throw badRequest("record required"); + + const diar = Array.isArray(rec.diarization) ? rec.diarization : []; + const totalFps = diar.reduce( + (n, d) => n + (d && d.ok ? Object.keys(d.fingerprints || {}).length : 0), + 0 + ); + if (totalFps === 0) { + const err = new Error( + "this meeting has no saved voice fingerprints — it predates fingerprint capture or was processed with diarization off, so it can't be re-clustered" + ); + err.code = "NO_FINGERPRINTS"; + throw err; + } + + const threshold = opts.threshold; + const { globalMap, uncertaintyMap, speakers, clusterCount } = clusterSpeakers( + diar, + threshold, + { + anchorMinSpeakingSec: opts.anchorMinSpeakingSec, + smallClusterMaxSpeakingSec: opts.smallClusterMaxSpeakingSec, + uncertainMarginPct: opts.uncertainMarginPct, + } + ); + + // Re-stamp the flat transcript segments off the new clustering... + if (Array.isArray(rec.transcript_segments)) { + assignSpeakersToSegments(rec.transcript_segments, diar, globalMap, uncertaintyMap); + } + // ...then clear + re-derive each chunk entry's speaker from them. + for (const chunk of rec.chunks || []) { + for (const entry of chunk.entries || []) { + if (!entry) continue; + entry.speaker = null; + entry.speaker_confidence = null; + entry.speaker_uncertain = false; + if ("speaker_override" in entry) delete entry.speaker_override; + } + } + backfillEntrySpeakers(rec, { force: true }); + + // New roster; stale name/attribution data reset. + rec.speakers = speakers; + rec.speaker_names = {}; + resetExtrasSpeakers(rec.extras); + + rec.meta = rec.meta || {}; + rec.meta.reclustered_at = Date.now(); + rec.meta.recluster_threshold = clampPct(threshold); + rec.meta.polish_done = false; + + return { speakers, clusterCount, threshold: rec.meta.recluster_threshold }; +} + +// ─── Apply re-polished summaries ──────────────────────────────────── +// After a re-polish pass (runSummaryPolish with the operator's corrected +// names), write the new section summaries back into the saved record: +// - rec.analysis.sections — the canonical section store +// - rec.chunks[].summary — the rendered topic cards +// Chunk summaries are matched to sections BY TITLE (polish never changes +// titles), consumed in section order so duplicate titles still line up. +// Chunk ENTRIES and any per-line speaker_override are left untouched — +// only the summary text changes. Returns the count of chunk summaries +// actually changed. +export function applyPolishedSummaries(rec, polishedSections) { + if (!rec || typeof rec !== "object" || !Array.isArray(polishedSections)) return 0; + + if (rec.analysis && typeof rec.analysis === "object") { + rec.analysis.sections = polishedSections; + } else { + rec.analysis = { sections: polishedSections }; + } + + // title → queue of summaries, in section order. + const byTitle = new Map(); + for (const s of polishedSections) { + const key = s && typeof s.title === "string" ? s.title : ""; + if (!byTitle.has(key)) byTitle.set(key, []); + byTitle.get(key).push(s && typeof s.summary === "string" ? s.summary : ""); + } + + const used = new Map(); + let changed = 0; + for (const chunk of rec.chunks || []) { + if (!chunk) continue; + const key = typeof chunk.title === "string" ? chunk.title : ""; + const list = byTitle.get(key); + if (!list || !list.length) continue; + const i = used.get(key) || 0; + const summary = i < list.length ? list[i] : list[list.length - 1]; + used.set(key, i + 1); + if (typeof summary === "string" && summary && chunk.summary !== summary) { + chunk.summary = summary; + changed += 1; + } + } + return changed; +} + +// ─── helpers ──────────────────────────────────────────────────────── + +function badRequest(message) { + const err = new Error(message); + err.code = "BAD_REQUEST"; + return err; +} + +function clampPct(v) { + const n = Number(v); + if (!Number.isFinite(n)) return 70; + return Math.max(50, Math.min(95, Math.round(n))); +} + +// Merge stats of `from` into `into` in place. turns / speaking-time / +// fingerprint-count sum; mean_confidence is turn-weighted across the +// clusters that have one; chunks_appeared_in uses max as a safe +// approximation (the raw per-cluster chunk sets aren't retained). +function mergeStats(into, from) { + if (!into || !from) return; + const t1 = into.turns || 0; + const t2 = from.turns || 0; + const c1 = typeof into.mean_confidence === "number" ? into.mean_confidence : null; + const c2 = typeof from.mean_confidence === "number" ? from.mean_confidence : null; + let mean = null; + if (c1 != null && c2 != null) { + const w = t1 + t2; + mean = w > 0 ? (c1 * t1 + c2 * t2) / w : (c1 + c2) / 2; + } else if (c1 != null) { + mean = c1; + } else if (c2 != null) { + mean = c2; + } + into.turns = t1 + t2; + into.total_speaking_seconds = + Math.round(((into.total_speaking_seconds || 0) + (from.total_speaking_seconds || 0)) * 10) / 10; + into.fingerprint_count = (into.fingerprint_count || 0) + (from.fingerprint_count || 0); + into.chunks_appeared_in = Math.max(into.chunks_appeared_in || 0, from.chunks_appeared_in || 0); + into.mean_confidence = mean; +} + +// Rewrite every speaker id in the extras block through `map`. +function remapExtrasSpeakers(extras, map) { + if (!extras || typeof extras !== "object") return; + if (extras.tldr && Array.isArray(extras.tldr.primary_speakers)) { + extras.tldr.primary_speakers = dedupe(extras.tldr.primary_speakers.map(map)); + } + for (const d of arr(extras.decisions)) { + if (Array.isArray(d.agreed_by)) d.agreed_by = dedupe(d.agreed_by.map(map)); + } + for (const a of arr(extras.action_items)) { + if (a.owner) a.owner = map(a.owner); + } + for (const q of arr(extras.key_quotes)) { + if (q.speaker) q.speaker = map(q.speaker); + } +} + +// Clear extras speaker attributions (keep the text). Used by re-cluster +// since cluster identities change and old ids would be meaningless. +function resetExtrasSpeakers(extras) { + if (!extras || typeof extras !== "object") return; + if (extras.tldr) extras.tldr.primary_speakers = []; + for (const d of arr(extras.decisions)) d.agreed_by = []; + for (const a of arr(extras.action_items)) a.owner = null; + for (const q of arr(extras.key_quotes)) q.speaker = null; +} + +function arr(v) { + return Array.isArray(v) ? v : []; +} + +function dedupe(list) { + return [...new Set(list)]; +} diff --git a/server/output-store.js b/server/output-store.js new file mode 100644 index 0000000..1811dfd --- /dev/null +++ b/server/output-store.js @@ -0,0 +1,163 @@ +// Per-job output storage. After a transcribe + analyze cycle +// completes, the worker calls saveJobOutput() to persist the +// transcript + analysis JSON to /data/relay-outputs/.json. +// The operator dashboard surfaces these as a "View" link per job +// that opens a Recap-style two-pane render in a new tab. +// +// Storage policy: +// - Test-run jobs (source = "admin-test") are ALWAYS saved +// - Real-user jobs are saved only when relay_save_user_outputs +// is true in the operator config (default false for privacy) +// +// Storage format (per file): +// { +// job_id: string +// batch_id: string | null +// source: "admin-test" | null +// saved_at: ms-epoch +// transcript: string ("[MM:SS] line\n[MM:SS] line...") +// analysis: { sections: [{title, summary, startIndex, endIndex}, ...] } +// meta: { +// title, media_url, audio_seconds, audio_bytes, +// transcribe_backend, transcribe_model, +// analyze_backend, analyze_model, +// transcribe_ms, analyze_ms, wall_time_ms, +// captions_mode +// } +// } +// +// Files are simple JSON — no index, no DB. Listing scans the dir; +// deletion just rm's the file. Cheap up to thousands of entries; if +// the operator hits scale, swap in a SQLite index without changing +// the on-disk format. + +import fs from "fs/promises"; +import path from "path"; + +let outputDir = "/data/relay-outputs"; + +export async function initOutputStore({ dataDir }) { + outputDir = path.join(dataDir, "relay-outputs"); + try { + await fs.mkdir(outputDir, { recursive: true, mode: 0o700 }); + } catch (err) { + console.warn(`[output-store] mkdir failed: ${err?.message || err}`); + } + console.log(`[output-store] writing to ${outputDir}`); +} + +// Path constructor with light sanitization — job_id is a UUID-style +// string, but filter out anything that could traverse the filesystem +// just in case the upstream ID generator changes. +function pathFor(jobId) { + const safe = String(jobId || "").replace(/[^a-zA-Z0-9_-]/g, ""); + if (!safe) throw new Error("invalid job_id"); + return path.join(outputDir, `${safe}.json`); +} + +// Save a job's transcript + analysis to disk. Best-effort: on write +// failure, log and continue — the audit log remains the source of +// truth for whether the job ran. +export async function saveJobOutput(jobId, payload) { + try { + const filePath = pathFor(jobId); + const body = JSON.stringify( + { job_id: jobId, saved_at: Date.now(), ...payload }, + null, + 2 + ); + await fs.writeFile(filePath, body, { mode: 0o600 }); + } catch (err) { + console.warn( + `[output-store] save failed for ${jobId}: ${err?.message || err}` + ); + } +} + +// Read a single job's stored output. Returns null when missing — +// the route layer should turn that into a 404. +export async function getJobOutput(jobId) { + try { + const filePath = pathFor(jobId); + const raw = await fs.readFile(filePath, "utf8"); + return JSON.parse(raw); + } catch (err) { + if (err.code === "ENOENT") return null; + console.warn( + `[output-store] read failed for ${jobId}: ${err?.message || err}` + ); + return null; + } +} + +// Check existence cheaply (stat) without reading the file body — +// the Jobs table only needs a has_output boolean per row, not the +// full payload, and scanning thousands of stats is much cheaper +// than reading thousands of files into memory. +export async function listJobOutputIds() { + try { + const files = await fs.readdir(outputDir); + return files + .filter((f) => f.endsWith(".json")) + .map((f) => f.replace(/\.json$/, "")); + } catch (err) { + if (err.code === "ENOENT") return []; + console.warn( + `[output-store] list failed: ${err?.message || err}` + ); + return []; + } +} + +// Delete one job's output. Returns true on success, false when +// the file didn't exist. +export async function deleteJobOutput(jobId) { + try { + await fs.unlink(pathFor(jobId)); + return true; + } catch (err) { + if (err.code === "ENOENT") return false; + throw err; + } +} + +// Bulk delete. Accepts either an array of job_ids or { all: true }. +// Returns { deleted, missing } for caller reporting. +export async function bulkDeleteOutputs({ jobIds, all }) { + let deleted = 0; + let missing = 0; + if (all) { + const ids = await listJobOutputIds(); + for (const id of ids) { + const ok = await deleteJobOutput(id).catch(() => false); + if (ok) deleted++; + } + return { deleted, missing }; + } + if (!Array.isArray(jobIds)) return { deleted: 0, missing: 0 }; + for (const id of jobIds) { + const ok = await deleteJobOutput(id).catch(() => false); + if (ok) deleted++; + else missing++; + } + return { deleted, missing }; +} + +// Aggregate stats for the dashboard "Stored outputs" mini-panel. +export async function getStoredOutputsSummary() { + try { + const files = await fs.readdir(outputDir); + const jsonFiles = files.filter((f) => f.endsWith(".json")); + let totalBytes = 0; + for (const f of jsonFiles) { + try { + const s = await fs.stat(path.join(outputDir, f)); + totalBytes += s.size; + } catch {} + } + return { count: jsonFiles.length, total_bytes: totalBytes }; + } catch (err) { + if (err.code === "ENOENT") return { count: 0, total_bytes: 0 }; + return { count: 0, total_bytes: 0, error: err?.message }; + } +} diff --git a/server/post-cluster-polish.js b/server/post-cluster-polish.js new file mode 100644 index 0000000..40329d3 --- /dev/null +++ b/server/post-cluster-polish.js @@ -0,0 +1,655 @@ +// Post-cluster polish pass: after transcribe + diarize + clustering +// have produced a speaker-labeled transcript, AND after pipelined +// analyze has produced section objects (titles + summaries), run a +// two-stage LLM pass that: +// +// Stage 1 — Global name inference. One LLM call with the +// speaker-labeled transcript + episode metadata (channel name, +// title, description) → JSON map { Speaker_A: "Matt Hill", +// Speaker_B: "Sarah Jones", Speaker_C: null }. The "_C: null" +// case is essential: when the LLM can't confidently identify a +// speaker, it must return null instead of guessing. +// +// Stage 2 — Per-window summary polish. N parallel LLM calls, one +// per analyze window. Each call sees that window's sections +// (original summaries) + that window's transcript with speaker +// labels + the global name map from Stage 1, and rewrites each +// section's SUMMARY to attribute statements to specific +// speakers ("Matt Hill explains..." vs "the discussion +// centers..."). Section TITLES and start/end indices are kept +// unchanged — polish only touches summary text. +// +// Why two stages: name inference benefits from the FULL transcript +// view (name introductions like "welcome Matt" tend to appear in +// window 1 but Matt keeps speaking throughout); per-window polish +// benefits from parallelism (matches the existing analyze pattern). +// Running them as one batched call would either lose parallelism +// or send the full transcript N times. +// +// Failure modes: +// - Stage 1 returns invalid JSON → all names default to null; +// Stage 2 still runs and produces "Speaker A explains..." etc. +// - Stage 2 fails for a particular window → keep the original +// analyze summary for that window's sections. Per-window +// failure shouldn't kill the whole polish. +// - Both stages fail → fall back to the unpolished analyzeResult. +// The caller sees the same output as a polish-disabled run. +// +// Cost: Stage 1 ~5-10s; Stage 2 ~10-15s (parallel); total ~15-25s +// added to end of pipeline. On a 200s pipelined pipeline that's a +// ~10% slowdown for the speaker-attribution UX win. + +import { recordCall } from "./audit-log.js"; + +const STAGE_1_MAX_ATTEMPTS = 3; +const STAGE_2_MAX_ATTEMPTS = 3; + +// ─── Default prompts (operator-editable via Settings tab) ─────────── +// +// Same three-layer override pattern as the analyze + transcribe +// prompts: per-session operator override → operator-promoted default +// → these hardcoded defaults. Both are validated on save — +// `DEFAULT_NAME_INFERENCE_PROMPT_TEMPLATE` must contain {{transcript}} +// and JSON output instructions; `DEFAULT_SUMMARY_POLISH_PROMPT_TEMPLATE` +// must contain {{sections}} and JSON output instructions. Template +// variables (interpolated at request time): +// +// Name inference prompt: +// {{channel}} — operator-supplied or yt-dlp-extracted channel name +// {{title}} — episode/video title +// {{description}} — episode description (capped at 800 chars) +// {{speakerStats}} — pre-formatted block listing each speaker's +// chip letter, total speaking time, turn count +// {{transcript}} — speaker-labeled bracketed transcript, capped +// at 25k chars (middle truncated when over) +// {{speakerKeys}} — JSON-schema-friendly key list for the +// response shape (one line per Speaker_X) +// +// Summary polish prompt: +// {{speakerRoster}} — pre-formatted block listing each speaker +// with their inferred name (or "(unknown)") +// and stats +// {{transcript}} — this window's slice of the labeled transcript +// {{sections}} — pre-formatted block listing each section +// with title + original summary + time range +export const DEFAULT_NAME_INFERENCE_PROMPT_TEMPLATE = `You are identifying real-world speaker names in an interview/podcast/meeting transcript. The transcript below has been pre-tagged with speaker labels like [A], [B], [C] — these are anonymous labels assigned by voice clustering. Your job: infer the real names of each speaker from contextual clues in the transcript. + +EPISODE METADATA: +- Channel/show: {{channel}} +- Episode title: {{title}} +- Description: {{description}} + +{{operatorContext}}SPEAKER STATISTICS (cluster output): +{{speakerStats}} + +TRANSCRIPT (each line is "[ ] text"): +{{transcript}} + +INSTRUCTIONS: +1. For each Speaker_X in the speaker statistics, infer the real name from contextual clues: + - Direct introductions ("welcome Matt", "I'm joined by Sarah") + - Self-introductions ("my name is", "I'm Sarah, founder of...") + - References between speakers ("what do you think Matt?", "as Sarah was saying") + - Channel name or episode title hints + - Operator hints in the OPERATOR HINTS section above, IF PRESENT — but see rule 6 for how to weight those. +2. Use the speaker statistics to help — the host typically speaks more turns; guests speak less. +3. Use first + last name if confidently identifiable. Use first name only if that's all you have. +4. RETURN null IF YOU CANNOT CONFIDENTLY IDENTIFY THE SPEAKER. Do not guess. A null is better than a wrong name. +5. For brief speakers (under 30s of speaking time, e.g. an intro music VO or a passing comment) it's expected that you'll often return null. +6. WEIGHTING OPERATOR HINTS: When an OPERATOR HINTS section appears above, treat it as informed suggestion, NOT authoritative truth. The operator may have listed people who turned out not to speak, omitted people who did, or guessed wrong on who matches which voice. ALWAYS verify hints against the transcript. Specifically: + - A name in the hints is only a candidate; if the transcript provides no signal that THIS Speaker_X is that person, return null instead of guessing. + - If the transcript clearly identifies a speaker as someone NOT in the hints, use the transcript's name. + - If the hints describe what each named person did ("Steve gave the update, John asked questions"), use that as a soft signal for mapping names to chip letters, but still verify with the transcript before committing. + - It is better to leave a speaker as null than to confidently map a hint to the wrong chip letter. + +Respond with ONLY valid JSON in this exact format, no other text: +{ + "speakers": { +{{speakerKeys}} + } +}`; + +export const DEFAULT_SUMMARY_POLISH_PROMPT_TEMPLATE = `You wrote section summaries for a podcast/interview transcript window. We've now identified the speakers via voice clustering and (where possible) inferred their real names. Your job: rewrite each section's SUMMARY to attribute statements to specific speakers where it improves clarity, naturalness, and information density. + +SPEAKERS (from voice clustering across the full episode): +{{speakerRoster}} + +WINDOW TRANSCRIPT (this window's slice; each line is labeled with the speaker's name, or a chip letter when their name is unknown): +{{transcript}} + +ORIGINAL SECTIONS IN THIS WINDOW (re-write the summary of each): +{{sections}} + +INSTRUCTIONS: +1. The SPEAKERS roster and the WINDOW TRANSCRIPT are the AUTHORITATIVE source of who said what. The ORIGINAL summaries were written in an earlier pass and may attribute statements to OUTDATED or WRONG speaker names — your job includes CORRECTING those. +2. Rewrite each section's SUMMARY so every speaker attribution matches the transcript + roster. If an original summary credits a statement to a person who, per the transcript, was actually said by someone else, REPLACE the name with the correct one. Never keep a name that does not appear in the roster. +3. Use real names when available ("Matt Hill explains..."); fall back to a chip letter only for a speaker who has no name ("Speaker A explains..."). +4. Keep summaries 1-3 sentences — same length range as the original. +5. KEEP THE TITLE EXACTLY AS GIVEN. Do not rewrite titles. +6. Return the sections in the SAME ORDER as given, with the SAME INDEX numbers in the array. +7. If a section is primarily one speaker, lean into their name ("Matt explains..."). If it is back-and-forth, name both ("Matt and Sarah debate..."). +8. If the transcript for a section genuinely has no speaker signal, keep the original summary's wording — but still fix or drop any name in it that conflicts with the roster. DO NOT invent attribution that the transcript does not support. + +Respond with ONLY valid JSON in this exact format: +{ + "sections": [ + { "index": 0, "summary": "Polished summary text..." }, + { "index": 1, "summary": "..." } + ] +} + +Return only the sections in this window. Use the same indices as the input ([0], [1], ...). Only the summary field — title and indices stay as given.`; + +// Substitute {{key}} placeholders in a template. Unknown keys are +// left as the literal {{key}} so an operator's edit that drops a +// variable doesn't crash the run — the model just sees the placeholder. +function fillTemplate(template, vars) { + return String(template || "").replace(/\{\{\s*(\w+)\s*\}\}/g, (_match, key) => { + return key in vars ? String(vars[key]) : `{{${key}}}`; + }); +} + +// Build a transcript representation with speaker labels prefixed. +// Each line: `[A 0:08] So Matt, tell us how you got started…` +// - The bracketed prefix is `[