// Per-job aggregation over the relay's audit log. The audit log // records ONE row per relay call (transcribe or analyze); a single // summary job typically produces 1 transcribe row + N analyze rows // (one per chunked-analyze window). This module groups those rows by // X-Recap-Job-Id and computes per-video stats the dashboard renders // as a sortable / filterable table. // // The aggregation is computed on the fly from the in-memory entries // array — no separate persistence. A typical 30-day window has a few // thousand audit rows; grouping is O(n) and well under 10 ms. // // Output row shape (one per job_id, plus a synthetic row for // orphaned entries with no job_id): // { // job_id: string | null // started_at: ms-epoch (earliest ts across the job's rows) // completed_at: ms-epoch (latest ts) // install_id: short string // tier: "core" | "pro" | "max" | null // media_url: string | null // from the transcribe row // title: string | null // ditto // audio_seconds: number | null // from transcribe row // audio_bytes: number | null // ditto (bytes downloaded // by the relay for transcribe-url) // transcribe_status: "success" | "error" | "refused" | "missing" // transcribe_backend: "gemini" | "hardware" | null // transcribe_model: string | null // transcribe_ms: number | null // download_ms: number | null // chunk_count: number | null // transcribe-side audio chunks // analyze_windows_total: number // count of analyze rows // analyze_windows_success: number // analyze_windows_failed: number // analyze_backend: string | null // dominant backend across analyze rows // analyze_model: string | null // dominant model across analyze rows // analyze_ms: number // sum of analyze duration_ms // overall_status: "success" | "partial" | "failed" // wall_time_ms: completed_at - started_at // cost_usd: number (sum across all rows) // errors: string[] // concatenated short error strings // // Derived metrics — pre-computed so the UI can sort by them: // transcribe_ms_per_min: number | null // transcribe_ms / (audio_seconds/60) // transcribe_ms_per_mb: number | null // transcribe_ms / (audio_bytes / 1MB) // download_ms_per_mb: number | null // analyze_ms_per_min: number | null // analyze_ms_per_mb: number | null // } const MB = 1024 * 1024; export function aggregateJobs(entries, opts = {}) { // Group by job_id. Entries without job_id become singleton groups // keyed by their ts so they still appear in the table (helpful for // debugging orphan calls). const groups = new Map(); for (const e of entries) { const key = e.job_id || `_orphan_${e.ts}`; if (!groups.has(key)) groups.set(key, []); groups.get(key).push(e); } // Optional set of job_ids that have stored output JSONs — passed // in from the route layer so the aggregator doesn't have to hit // the filesystem itself. Used to set the has_output flag the // dashboard reads to show/hide the "View" link. const outputIdSet = opts.outputIdSet instanceof Set ? opts.outputIdSet : null; const out = []; for (const [key, rows] of groups) { const row = aggregateOne(key, rows); row.has_output = outputIdSet ? outputIdSet.has(row.job_id) : false; out.push(row); } // Newest first by started_at. out.sort((a, b) => b.started_at - a.started_at); return out; } function aggregateOne(key, rows) { rows.sort((a, b) => a.ts - b.ts); // ts in each audit row is when recordCall() fired — i.e., when the // work for that row COMPLETED, not when it started. To recover the // user-POV "job start" timestamp we work backwards from the first // row's end-time using its duration_ms AND download_ms fields. // // TX row layout: // ts = download_end + tx_work_end // duration_ms = tx_work_duration (NOT including download) // download_ms = download_duration // // So: job_start = ts - duration_ms - download_ms. // // Without including download_ms here, wall_time misses the // download phase (which can be 30-60s on a long YouTube fetch). // Including it makes wall_time match the operator's intuitive // formula: WALL ≈ DL + TX + AN_wall. const firstRowDur = Number(rows[0].duration_ms) || 0; const firstRowDownload = Number(rows[0].download_ms) || 0; const startedAt = rows[0].ts - firstRowDur - firstRowDownload; const completedAt = rows[rows.length - 1].ts; const tx = rows.find((r) => r.pipeline === "transcribe"); const analyzeRows = rows.filter((r) => r.pipeline === "analyze"); const analyzeSuccess = analyzeRows.filter((r) => r.status === "success"); const analyzeFailed = analyzeRows.filter((r) => r.status !== "success"); const analyzeMs = analyzeRows.reduce( (s, r) => s + (Number(r.duration_ms) || 0), 0 ); // Analyze wall time: elapsed clock time from when the FIRST window // started to when the LAST window finished. For a 1-batch parallel // analyze (all N windows fire concurrently), this ≈ the slowest // single window's duration. For multi-batch (N > concurrency, e.g. // 10 windows at concurrency 8 → 2 sequential batches), this spans // both batches including any gap. Computed from end-ts minus // start-ts (where start-ts = row.ts - row.duration_ms) so it's // an accurate measured value, not a predicted one. let analyzeWallMs = null; if (analyzeRows.length > 0) { let minStart = Infinity; let maxEnd = -Infinity; for (const r of analyzeRows) { const end = Number(r.ts) || 0; const dur = Number(r.duration_ms) || 0; const start = end - dur; if (start < minStart) minStart = start; if (end > maxEnd) maxEnd = end; } analyzeWallMs = maxEnd - minStart; } const analyzeDominantBackend = dominant( analyzeSuccess.map((r) => r.backend) ); const analyzeDominantModel = dominant(analyzeSuccess.map((r) => r.model)); const errors = rows .filter((r) => r.error) .map((r) => `${r.pipeline}: ${String(r.error).slice(0, 160)}`); const txStatus = tx ? tx.status : "missing"; let overall; if (txStatus === "error" || txStatus === "refused" || txStatus === "missing") { overall = "failed"; } else if (txStatus === "partial") { // TX produced a truncated transcript (chunks hit the output-token // cap). Mark the whole job partial regardless of analyze status — // the analysis was performed against incomplete input, so even // a "success" on analyze rows is misleading. overall = "partial"; } else if (analyzeRows.length === 0) { // Transcribe succeeded but no analyze rows — could be in flight, // or the client never called /relay/analyze (uses local model). overall = "success"; } else if (analyzeSuccess.length === analyzeRows.length) { overall = "success"; } else if (analyzeSuccess.length > 0) { overall = "partial"; } else { overall = "failed"; } const cost = rows.reduce((s, r) => s + (Number(r.cost_usd) || 0), 0); // Use `??` (nullish-coalesce) — NOT `||` — so a legitimate 0 isn't // treated as missing data. The test-run worker writes duration_ms=0 // historically (pre-fix) on cache-hit siblings; even though the new // worker writes a non-zero shared wall-time, old audit rows from // earlier benchmark batches still live in the NDJSON and we want // those rendered correctly rather than collapsed to "—". const audioSec = tx?.audio_seconds ?? null; const audioBytes = tx?.audio_bytes ?? null; const txMs = tx?.duration_ms ?? null; const downloadMs = tx?.download_ms ?? null; // TX backend compute time = sum of per-chunk wall-times. Distinct // from txMs which is the outer parallel-fan-out wall-time. // single-chunk: txMsSum ≈ txMs (one chunk, one duration) // N-chunks at concurrency C: txMsSum ≈ N × per-chunk-duration // txMs ≈ ⌈N/C⌉ × per-chunk-duration // Falls back to txMs (the wall-time) when chunk_durations_ms is // absent — old audit rows from before v0.2.41 don't have it. const chunkDurationsArr = Array.isArray(tx?.chunk_durations_ms) ? tx.chunk_durations_ms : null; const txMsSum = chunkDurationsArr ? chunkDurationsArr.reduce((s, d) => s + (Number(d) || 0), 0) : txMs; const audioMinutes = audioSec ? audioSec / 60 : null; const audioMb = audioBytes ? audioBytes / MB : null; // batch_id and source are stamped per audit row by the test-run // path; use the first non-null we see so dashboard filters work // regardless of which row gets read first in a multi-row job. const batchId = rows.find((r) => r.batch_id)?.batch_id || null; const source = rows.find((r) => r.source)?.source || null; return { job_id: key.startsWith("_orphan_") ? null : key, started_at: startedAt, completed_at: completedAt, install_id: tx?.install_id || rows[0].install_id || null, tier: tx?.tier || rows[0].tier || null, media_url: tx?.media_url || null, title: tx?.title || null, batch_id: batchId, source: source, audio_seconds: audioSec, audio_bytes: audioBytes, transcribe_status: txStatus, transcribe_backend: tx?.backend || null, transcribe_model: tx?.model || null, // transcribe_ms = outer wall-time of the whole TX phase (the // value the operator perceives as "how long did transcribe // take"). transcribe_ms_sum = total backend compute across all // chunks (drives cost; equals N × wall when N chunks run truly // sequentially, equals wall when single-chunk). For Gemini at // concurrency 12 over 3 chunks: wall ≈ 60s, sum ≈ 180s. transcribe_ms: txMs, transcribe_ms_sum: txMsSum, download_ms: downloadMs, chunk_count: tx?.chunk_count ?? null, analyze_windows_total: analyzeRows.length, analyze_windows_success: analyzeSuccess.length, analyze_windows_failed: analyzeFailed.length, analyze_backend: analyzeDominantBackend, analyze_model: analyzeDominantModel, // analyze_ms = SUM of per-window durations (total backend compute, // useful for cost). analyze_wall_time_ms = ELAPSED time from // first window start to last window end (the time a user actually // waits for the analyze phase). The two diverge when N windows // run in parallel: a 10-window 100s-per-window job has analyze_ms // = 1000s but analyze_wall_time_ms ≈ 100s (single batch) or // ≈ 200s (two sequential batches at concurrency 5). analyze_ms: analyzeMs, analyze_wall_time_ms: analyzeWallMs, overall_status: overall, wall_time_ms: completedAt - startedAt, cost_usd: cost, errors, // Derived rate metrics: transcribe_ms_per_min: audioMinutes && txMs ? txMs / audioMinutes : null, transcribe_ms_per_mb: audioMb && txMs ? txMs / audioMb : null, download_ms_per_mb: audioMb && downloadMs ? downloadMs / audioMb : null, analyze_ms_per_min: audioMinutes && analyzeMs ? analyzeMs / audioMinutes : null, analyze_wall_ms_per_min: audioMinutes && analyzeWallMs ? analyzeWallMs / audioMinutes : null, analyze_ms_per_mb: audioMb && analyzeMs ? analyzeMs / audioMb : null, }; } // Pick the most frequent string in a list (ties broken by first // occurrence). Used to attribute a backend/model to a job when its // rows might disagree (e.g. some analyze windows hit gemini and // fallback chain walked to hardware on others). function dominant(values) { const counts = new Map(); for (const v of values) { if (!v) continue; counts.set(v, (counts.get(v) || 0) + 1); } let best = null; let bestCount = 0; for (const [v, c] of counts) { if (c > bestCount) { best = v; bestCount = c; } } return best; } // Compute summary statistics across all aggregated jobs. Returned to // the dashboard's top-of-page cards: success rate, total processing // time, average wall-time per video, etc. export function summarizeJobs(jobs) { const total = jobs.length; if (total === 0) { return { total: 0, success: 0, partial: 0, failed: 0, success_rate: 1, median_wall_time_ms: null, median_transcribe_ms_per_min: null, median_analyze_ms_per_min: null, total_cost_usd: 0, total_audio_hours: 0, }; } const success = jobs.filter((j) => j.overall_status === "success").length; const partial = jobs.filter((j) => j.overall_status === "partial").length; const failed = jobs.filter((j) => j.overall_status === "failed").length; const totalCost = jobs.reduce((s, j) => s + (j.cost_usd || 0), 0); const totalAudioSec = jobs.reduce( (s, j) => s + (j.audio_seconds || 0), 0 ); return { total, success, partial, failed, success_rate: (success + partial) / total, median_wall_time_ms: median(jobs.map((j) => j.wall_time_ms).filter(Number.isFinite)), median_transcribe_ms_per_min: median( jobs.map((j) => j.transcribe_ms_per_min).filter(Number.isFinite) ), median_analyze_ms_per_min: median( jobs.map((j) => j.analyze_ms_per_min).filter(Number.isFinite) ), total_cost_usd: totalCost, total_audio_hours: totalAudioSec / 3600, }; } function median(arr) { if (!arr.length) return null; const sorted = [...arr].sort((a, b) => a - b); const mid = Math.floor(sorted.length / 2); return sorted.length % 2 === 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; }