324 lines
13 KiB
JavaScript
324 lines
13 KiB
JavaScript
// Per-job aggregation over the relay's audit log. The audit log
|
||
// records ONE row per relay call (transcribe or analyze); a single
|
||
// summary job typically produces 1 transcribe row + N analyze rows
|
||
// (one per chunked-analyze window). This module groups those rows by
|
||
// X-Recap-Job-Id and computes per-video stats the dashboard renders
|
||
// as a sortable / filterable table.
|
||
//
|
||
// The aggregation is computed on the fly from the in-memory entries
|
||
// array — no separate persistence. A typical 30-day window has a few
|
||
// thousand audit rows; grouping is O(n) and well under 10 ms.
|
||
//
|
||
// Output row shape (one per job_id, plus a synthetic row for
|
||
// orphaned entries with no job_id):
|
||
// {
|
||
// job_id: string | null
|
||
// started_at: ms-epoch (earliest ts across the job's rows)
|
||
// completed_at: ms-epoch (latest ts)
|
||
// install_id: short string
|
||
// tier: "core" | "pro" | "max" | null
|
||
// media_url: string | null // from the transcribe row
|
||
// title: string | null // ditto
|
||
// audio_seconds: number | null // from transcribe row
|
||
// audio_bytes: number | null // ditto (bytes downloaded
|
||
// by the relay for transcribe-url)
|
||
// transcribe_status: "success" | "error" | "refused" | "missing"
|
||
// transcribe_backend: "gemini" | "hardware" | null
|
||
// transcribe_model: string | null
|
||
// transcribe_ms: number | null
|
||
// download_ms: number | null
|
||
// chunk_count: number | null // transcribe-side audio chunks
|
||
// analyze_windows_total: number // count of analyze rows
|
||
// analyze_windows_success: number
|
||
// analyze_windows_failed: number
|
||
// analyze_backend: string | null // dominant backend across analyze rows
|
||
// analyze_model: string | null // dominant model across analyze rows
|
||
// analyze_ms: number // sum of analyze duration_ms
|
||
// overall_status: "success" | "partial" | "failed"
|
||
// wall_time_ms: completed_at - started_at
|
||
// cost_usd: number (sum across all rows)
|
||
// errors: string[] // concatenated short error strings
|
||
// // Derived metrics — pre-computed so the UI can sort by them:
|
||
// transcribe_ms_per_min: number | null // transcribe_ms / (audio_seconds/60)
|
||
// transcribe_ms_per_mb: number | null // transcribe_ms / (audio_bytes / 1MB)
|
||
// download_ms_per_mb: number | null
|
||
// analyze_ms_per_min: number | null
|
||
// analyze_ms_per_mb: number | null
|
||
// }
|
||
|
||
const MB = 1024 * 1024;
|
||
|
||
export function aggregateJobs(entries, opts = {}) {
|
||
// Group by job_id. Entries without job_id become singleton groups
|
||
// keyed by their ts so they still appear in the table (helpful for
|
||
// debugging orphan calls).
|
||
const groups = new Map();
|
||
for (const e of entries) {
|
||
const key = e.job_id || `_orphan_${e.ts}`;
|
||
if (!groups.has(key)) groups.set(key, []);
|
||
groups.get(key).push(e);
|
||
}
|
||
|
||
// Optional set of job_ids that have stored output JSONs — passed
|
||
// in from the route layer so the aggregator doesn't have to hit
|
||
// the filesystem itself. Used to set the has_output flag the
|
||
// dashboard reads to show/hide the "View" link.
|
||
const outputIdSet = opts.outputIdSet instanceof Set ? opts.outputIdSet : null;
|
||
|
||
const out = [];
|
||
for (const [key, rows] of groups) {
|
||
const row = aggregateOne(key, rows);
|
||
row.has_output = outputIdSet ? outputIdSet.has(row.job_id) : false;
|
||
out.push(row);
|
||
}
|
||
// Newest first by started_at.
|
||
out.sort((a, b) => b.started_at - a.started_at);
|
||
return out;
|
||
}
|
||
|
||
function aggregateOne(key, rows) {
|
||
rows.sort((a, b) => a.ts - b.ts);
|
||
// ts in each audit row is when recordCall() fired — i.e., when the
|
||
// work for that row COMPLETED, not when it started. To recover the
|
||
// user-POV "job start" timestamp we work backwards from the first
|
||
// row's end-time using its duration_ms AND download_ms fields.
|
||
//
|
||
// TX row layout:
|
||
// ts = download_end + tx_work_end
|
||
// duration_ms = tx_work_duration (NOT including download)
|
||
// download_ms = download_duration
|
||
//
|
||
// So: job_start = ts - duration_ms - download_ms.
|
||
//
|
||
// Without including download_ms here, wall_time misses the
|
||
// download phase (which can be 30-60s on a long YouTube fetch).
|
||
// Including it makes wall_time match the operator's intuitive
|
||
// formula: WALL ≈ DL + TX + AN_wall.
|
||
const firstRowDur = Number(rows[0].duration_ms) || 0;
|
||
const firstRowDownload = Number(rows[0].download_ms) || 0;
|
||
const startedAt = rows[0].ts - firstRowDur - firstRowDownload;
|
||
const completedAt = rows[rows.length - 1].ts;
|
||
|
||
const tx = rows.find((r) => r.pipeline === "transcribe");
|
||
const analyzeRows = rows.filter((r) => r.pipeline === "analyze");
|
||
|
||
const analyzeSuccess = analyzeRows.filter((r) => r.status === "success");
|
||
const analyzeFailed = analyzeRows.filter((r) => r.status !== "success");
|
||
const analyzeMs = analyzeRows.reduce(
|
||
(s, r) => s + (Number(r.duration_ms) || 0),
|
||
0
|
||
);
|
||
// Analyze wall time: elapsed clock time from when the FIRST window
|
||
// started to when the LAST window finished. For a 1-batch parallel
|
||
// analyze (all N windows fire concurrently), this ≈ the slowest
|
||
// single window's duration. For multi-batch (N > concurrency, e.g.
|
||
// 10 windows at concurrency 8 → 2 sequential batches), this spans
|
||
// both batches including any gap. Computed from end-ts minus
|
||
// start-ts (where start-ts = row.ts - row.duration_ms) so it's
|
||
// an accurate measured value, not a predicted one.
|
||
let analyzeWallMs = null;
|
||
if (analyzeRows.length > 0) {
|
||
let minStart = Infinity;
|
||
let maxEnd = -Infinity;
|
||
for (const r of analyzeRows) {
|
||
const end = Number(r.ts) || 0;
|
||
const dur = Number(r.duration_ms) || 0;
|
||
const start = end - dur;
|
||
if (start < minStart) minStart = start;
|
||
if (end > maxEnd) maxEnd = end;
|
||
}
|
||
analyzeWallMs = maxEnd - minStart;
|
||
}
|
||
const analyzeDominantBackend = dominant(
|
||
analyzeSuccess.map((r) => r.backend)
|
||
);
|
||
const analyzeDominantModel = dominant(analyzeSuccess.map((r) => r.model));
|
||
|
||
const errors = rows
|
||
.filter((r) => r.error)
|
||
.map((r) => `${r.pipeline}: ${String(r.error).slice(0, 160)}`);
|
||
|
||
const txStatus = tx ? tx.status : "missing";
|
||
let overall;
|
||
if (txStatus === "error" || txStatus === "refused" || txStatus === "missing") {
|
||
overall = "failed";
|
||
} else if (txStatus === "partial") {
|
||
// TX produced a truncated transcript (chunks hit the output-token
|
||
// cap). Mark the whole job partial regardless of analyze status —
|
||
// the analysis was performed against incomplete input, so even
|
||
// a "success" on analyze rows is misleading.
|
||
overall = "partial";
|
||
} else if (analyzeRows.length === 0) {
|
||
// Transcribe succeeded but no analyze rows — could be in flight,
|
||
// or the client never called /relay/analyze (uses local model).
|
||
overall = "success";
|
||
} else if (analyzeSuccess.length === analyzeRows.length) {
|
||
overall = "success";
|
||
} else if (analyzeSuccess.length > 0) {
|
||
overall = "partial";
|
||
} else {
|
||
overall = "failed";
|
||
}
|
||
|
||
const cost = rows.reduce((s, r) => s + (Number(r.cost_usd) || 0), 0);
|
||
|
||
// Use `??` (nullish-coalesce) — NOT `||` — so a legitimate 0 isn't
|
||
// treated as missing data. The test-run worker writes duration_ms=0
|
||
// historically (pre-fix) on cache-hit siblings; even though the new
|
||
// worker writes a non-zero shared wall-time, old audit rows from
|
||
// earlier benchmark batches still live in the NDJSON and we want
|
||
// those rendered correctly rather than collapsed to "—".
|
||
const audioSec = tx?.audio_seconds ?? null;
|
||
const audioBytes = tx?.audio_bytes ?? null;
|
||
const txMs = tx?.duration_ms ?? null;
|
||
const downloadMs = tx?.download_ms ?? null;
|
||
// TX backend compute time = sum of per-chunk wall-times. Distinct
|
||
// from txMs which is the outer parallel-fan-out wall-time.
|
||
// single-chunk: txMsSum ≈ txMs (one chunk, one duration)
|
||
// N-chunks at concurrency C: txMsSum ≈ N × per-chunk-duration
|
||
// txMs ≈ ⌈N/C⌉ × per-chunk-duration
|
||
// Falls back to txMs (the wall-time) when chunk_durations_ms is
|
||
// absent — old audit rows from before v0.2.41 don't have it.
|
||
const chunkDurationsArr = Array.isArray(tx?.chunk_durations_ms) ? tx.chunk_durations_ms : null;
|
||
const txMsSum = chunkDurationsArr
|
||
? chunkDurationsArr.reduce((s, d) => s + (Number(d) || 0), 0)
|
||
: txMs;
|
||
|
||
const audioMinutes = audioSec ? audioSec / 60 : null;
|
||
const audioMb = audioBytes ? audioBytes / MB : null;
|
||
|
||
// batch_id and source are stamped per audit row by the test-run
|
||
// path; use the first non-null we see so dashboard filters work
|
||
// regardless of which row gets read first in a multi-row job.
|
||
const batchId = rows.find((r) => r.batch_id)?.batch_id || null;
|
||
const source = rows.find((r) => r.source)?.source || null;
|
||
|
||
return {
|
||
job_id: key.startsWith("_orphan_") ? null : key,
|
||
started_at: startedAt,
|
||
completed_at: completedAt,
|
||
install_id: tx?.install_id || rows[0].install_id || null,
|
||
tier: tx?.tier || rows[0].tier || null,
|
||
media_url: tx?.media_url || null,
|
||
title: tx?.title || null,
|
||
batch_id: batchId,
|
||
source: source,
|
||
audio_seconds: audioSec,
|
||
audio_bytes: audioBytes,
|
||
transcribe_status: txStatus,
|
||
transcribe_backend: tx?.backend || null,
|
||
transcribe_model: tx?.model || null,
|
||
// transcribe_ms = outer wall-time of the whole TX phase (the
|
||
// value the operator perceives as "how long did transcribe
|
||
// take"). transcribe_ms_sum = total backend compute across all
|
||
// chunks (drives cost; equals N × wall when N chunks run truly
|
||
// sequentially, equals wall when single-chunk). For Gemini at
|
||
// concurrency 12 over 3 chunks: wall ≈ 60s, sum ≈ 180s.
|
||
transcribe_ms: txMs,
|
||
transcribe_ms_sum: txMsSum,
|
||
download_ms: downloadMs,
|
||
chunk_count: tx?.chunk_count ?? null,
|
||
analyze_windows_total: analyzeRows.length,
|
||
analyze_windows_success: analyzeSuccess.length,
|
||
analyze_windows_failed: analyzeFailed.length,
|
||
analyze_backend: analyzeDominantBackend,
|
||
analyze_model: analyzeDominantModel,
|
||
// analyze_ms = SUM of per-window durations (total backend compute,
|
||
// useful for cost). analyze_wall_time_ms = ELAPSED time from
|
||
// first window start to last window end (the time a user actually
|
||
// waits for the analyze phase). The two diverge when N windows
|
||
// run in parallel: a 10-window 100s-per-window job has analyze_ms
|
||
// = 1000s but analyze_wall_time_ms ≈ 100s (single batch) or
|
||
// ≈ 200s (two sequential batches at concurrency 5).
|
||
analyze_ms: analyzeMs,
|
||
analyze_wall_time_ms: analyzeWallMs,
|
||
overall_status: overall,
|
||
wall_time_ms: completedAt - startedAt,
|
||
cost_usd: cost,
|
||
errors,
|
||
// Derived rate metrics:
|
||
transcribe_ms_per_min: audioMinutes && txMs ? txMs / audioMinutes : null,
|
||
transcribe_ms_per_mb: audioMb && txMs ? txMs / audioMb : null,
|
||
download_ms_per_mb: audioMb && downloadMs ? downloadMs / audioMb : null,
|
||
analyze_ms_per_min: audioMinutes && analyzeMs ? analyzeMs / audioMinutes : null,
|
||
analyze_wall_ms_per_min: audioMinutes && analyzeWallMs ? analyzeWallMs / audioMinutes : null,
|
||
analyze_ms_per_mb: audioMb && analyzeMs ? analyzeMs / audioMb : null,
|
||
};
|
||
}
|
||
|
||
// Pick the most frequent string in a list (ties broken by first
|
||
// occurrence). Used to attribute a backend/model to a job when its
|
||
// rows might disagree (e.g. some analyze windows hit gemini and
|
||
// fallback chain walked to hardware on others).
|
||
function dominant(values) {
|
||
const counts = new Map();
|
||
for (const v of values) {
|
||
if (!v) continue;
|
||
counts.set(v, (counts.get(v) || 0) + 1);
|
||
}
|
||
let best = null;
|
||
let bestCount = 0;
|
||
for (const [v, c] of counts) {
|
||
if (c > bestCount) {
|
||
best = v;
|
||
bestCount = c;
|
||
}
|
||
}
|
||
return best;
|
||
}
|
||
|
||
// Compute summary statistics across all aggregated jobs. Returned to
|
||
// the dashboard's top-of-page cards: success rate, total processing
|
||
// time, average wall-time per video, etc.
|
||
export function summarizeJobs(jobs) {
|
||
const total = jobs.length;
|
||
if (total === 0) {
|
||
return {
|
||
total: 0,
|
||
success: 0,
|
||
partial: 0,
|
||
failed: 0,
|
||
success_rate: 1,
|
||
median_wall_time_ms: null,
|
||
median_transcribe_ms_per_min: null,
|
||
median_analyze_ms_per_min: null,
|
||
total_cost_usd: 0,
|
||
total_audio_hours: 0,
|
||
};
|
||
}
|
||
const success = jobs.filter((j) => j.overall_status === "success").length;
|
||
const partial = jobs.filter((j) => j.overall_status === "partial").length;
|
||
const failed = jobs.filter((j) => j.overall_status === "failed").length;
|
||
const totalCost = jobs.reduce((s, j) => s + (j.cost_usd || 0), 0);
|
||
const totalAudioSec = jobs.reduce(
|
||
(s, j) => s + (j.audio_seconds || 0),
|
||
0
|
||
);
|
||
|
||
return {
|
||
total,
|
||
success,
|
||
partial,
|
||
failed,
|
||
success_rate: (success + partial) / total,
|
||
median_wall_time_ms: median(jobs.map((j) => j.wall_time_ms).filter(Number.isFinite)),
|
||
median_transcribe_ms_per_min: median(
|
||
jobs.map((j) => j.transcribe_ms_per_min).filter(Number.isFinite)
|
||
),
|
||
median_analyze_ms_per_min: median(
|
||
jobs.map((j) => j.analyze_ms_per_min).filter(Number.isFinite)
|
||
),
|
||
total_cost_usd: totalCost,
|
||
total_audio_hours: totalAudioSec / 3600,
|
||
};
|
||
}
|
||
|
||
function median(arr) {
|
||
if (!arr.length) return null;
|
||
const sorted = [...arr].sort((a, b) => a - b);
|
||
const mid = Math.floor(sorted.length / 2);
|
||
return sorted.length % 2 === 0
|
||
? (sorted[mid - 1] + sorted[mid]) / 2
|
||
: sorted[mid];
|
||
}
|