Files

324 lines
13 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Per-job aggregation over the relay's audit log. The audit log
// records ONE row per relay call (transcribe or analyze); a single
// summary job typically produces 1 transcribe row + N analyze rows
// (one per chunked-analyze window). This module groups those rows by
// X-Recap-Job-Id and computes per-video stats the dashboard renders
// as a sortable / filterable table.
//
// The aggregation is computed on the fly from the in-memory entries
// array — no separate persistence. A typical 30-day window has a few
// thousand audit rows; grouping is O(n) and well under 10 ms.
//
// Output row shape (one per job_id, plus a synthetic row for
// orphaned entries with no job_id):
// {
// job_id: string | null
// started_at: ms-epoch (earliest ts across the job's rows)
// completed_at: ms-epoch (latest ts)
// install_id: short string
// tier: "core" | "pro" | "max" | null
// media_url: string | null // from the transcribe row
// title: string | null // ditto
// audio_seconds: number | null // from transcribe row
// audio_bytes: number | null // ditto (bytes downloaded
// by the relay for transcribe-url)
// transcribe_status: "success" | "error" | "refused" | "missing"
// transcribe_backend: "gemini" | "hardware" | null
// transcribe_model: string | null
// transcribe_ms: number | null
// download_ms: number | null
// chunk_count: number | null // transcribe-side audio chunks
// analyze_windows_total: number // count of analyze rows
// analyze_windows_success: number
// analyze_windows_failed: number
// analyze_backend: string | null // dominant backend across analyze rows
// analyze_model: string | null // dominant model across analyze rows
// analyze_ms: number // sum of analyze duration_ms
// overall_status: "success" | "partial" | "failed"
// wall_time_ms: completed_at - started_at
// cost_usd: number (sum across all rows)
// errors: string[] // concatenated short error strings
// // Derived metrics — pre-computed so the UI can sort by them:
// transcribe_ms_per_min: number | null // transcribe_ms / (audio_seconds/60)
// transcribe_ms_per_mb: number | null // transcribe_ms / (audio_bytes / 1MB)
// download_ms_per_mb: number | null
// analyze_ms_per_min: number | null
// analyze_ms_per_mb: number | null
// }
const MB = 1024 * 1024;
export function aggregateJobs(entries, opts = {}) {
// Group by job_id. Entries without job_id become singleton groups
// keyed by their ts so they still appear in the table (helpful for
// debugging orphan calls).
const groups = new Map();
for (const e of entries) {
const key = e.job_id || `_orphan_${e.ts}`;
if (!groups.has(key)) groups.set(key, []);
groups.get(key).push(e);
}
// Optional set of job_ids that have stored output JSONs — passed
// in from the route layer so the aggregator doesn't have to hit
// the filesystem itself. Used to set the has_output flag the
// dashboard reads to show/hide the "View" link.
const outputIdSet = opts.outputIdSet instanceof Set ? opts.outputIdSet : null;
const out = [];
for (const [key, rows] of groups) {
const row = aggregateOne(key, rows);
row.has_output = outputIdSet ? outputIdSet.has(row.job_id) : false;
out.push(row);
}
// Newest first by started_at.
out.sort((a, b) => b.started_at - a.started_at);
return out;
}
function aggregateOne(key, rows) {
rows.sort((a, b) => a.ts - b.ts);
// ts in each audit row is when recordCall() fired — i.e., when the
// work for that row COMPLETED, not when it started. To recover the
// user-POV "job start" timestamp we work backwards from the first
// row's end-time using its duration_ms AND download_ms fields.
//
// TX row layout:
// ts = download_end + tx_work_end
// duration_ms = tx_work_duration (NOT including download)
// download_ms = download_duration
//
// So: job_start = ts - duration_ms - download_ms.
//
// Without including download_ms here, wall_time misses the
// download phase (which can be 30-60s on a long YouTube fetch).
// Including it makes wall_time match the operator's intuitive
// formula: WALL ≈ DL + TX + AN_wall.
const firstRowDur = Number(rows[0].duration_ms) || 0;
const firstRowDownload = Number(rows[0].download_ms) || 0;
const startedAt = rows[0].ts - firstRowDur - firstRowDownload;
const completedAt = rows[rows.length - 1].ts;
const tx = rows.find((r) => r.pipeline === "transcribe");
const analyzeRows = rows.filter((r) => r.pipeline === "analyze");
const analyzeSuccess = analyzeRows.filter((r) => r.status === "success");
const analyzeFailed = analyzeRows.filter((r) => r.status !== "success");
const analyzeMs = analyzeRows.reduce(
(s, r) => s + (Number(r.duration_ms) || 0),
0
);
// Analyze wall time: elapsed clock time from when the FIRST window
// started to when the LAST window finished. For a 1-batch parallel
// analyze (all N windows fire concurrently), this ≈ the slowest
// single window's duration. For multi-batch (N > concurrency, e.g.
// 10 windows at concurrency 8 → 2 sequential batches), this spans
// both batches including any gap. Computed from end-ts minus
// start-ts (where start-ts = row.ts - row.duration_ms) so it's
// an accurate measured value, not a predicted one.
let analyzeWallMs = null;
if (analyzeRows.length > 0) {
let minStart = Infinity;
let maxEnd = -Infinity;
for (const r of analyzeRows) {
const end = Number(r.ts) || 0;
const dur = Number(r.duration_ms) || 0;
const start = end - dur;
if (start < minStart) minStart = start;
if (end > maxEnd) maxEnd = end;
}
analyzeWallMs = maxEnd - minStart;
}
const analyzeDominantBackend = dominant(
analyzeSuccess.map((r) => r.backend)
);
const analyzeDominantModel = dominant(analyzeSuccess.map((r) => r.model));
const errors = rows
.filter((r) => r.error)
.map((r) => `${r.pipeline}: ${String(r.error).slice(0, 160)}`);
const txStatus = tx ? tx.status : "missing";
let overall;
if (txStatus === "error" || txStatus === "refused" || txStatus === "missing") {
overall = "failed";
} else if (txStatus === "partial") {
// TX produced a truncated transcript (chunks hit the output-token
// cap). Mark the whole job partial regardless of analyze status —
// the analysis was performed against incomplete input, so even
// a "success" on analyze rows is misleading.
overall = "partial";
} else if (analyzeRows.length === 0) {
// Transcribe succeeded but no analyze rows — could be in flight,
// or the client never called /relay/analyze (uses local model).
overall = "success";
} else if (analyzeSuccess.length === analyzeRows.length) {
overall = "success";
} else if (analyzeSuccess.length > 0) {
overall = "partial";
} else {
overall = "failed";
}
const cost = rows.reduce((s, r) => s + (Number(r.cost_usd) || 0), 0);
// Use `??` (nullish-coalesce) — NOT `||` — so a legitimate 0 isn't
// treated as missing data. The test-run worker writes duration_ms=0
// historically (pre-fix) on cache-hit siblings; even though the new
// worker writes a non-zero shared wall-time, old audit rows from
// earlier benchmark batches still live in the NDJSON and we want
// those rendered correctly rather than collapsed to "—".
const audioSec = tx?.audio_seconds ?? null;
const audioBytes = tx?.audio_bytes ?? null;
const txMs = tx?.duration_ms ?? null;
const downloadMs = tx?.download_ms ?? null;
// TX backend compute time = sum of per-chunk wall-times. Distinct
// from txMs which is the outer parallel-fan-out wall-time.
// single-chunk: txMsSum ≈ txMs (one chunk, one duration)
// N-chunks at concurrency C: txMsSum ≈ N × per-chunk-duration
// txMs ≈ ⌈N/C⌉ × per-chunk-duration
// Falls back to txMs (the wall-time) when chunk_durations_ms is
// absent — old audit rows from before v0.2.41 don't have it.
const chunkDurationsArr = Array.isArray(tx?.chunk_durations_ms) ? tx.chunk_durations_ms : null;
const txMsSum = chunkDurationsArr
? chunkDurationsArr.reduce((s, d) => s + (Number(d) || 0), 0)
: txMs;
const audioMinutes = audioSec ? audioSec / 60 : null;
const audioMb = audioBytes ? audioBytes / MB : null;
// batch_id and source are stamped per audit row by the test-run
// path; use the first non-null we see so dashboard filters work
// regardless of which row gets read first in a multi-row job.
const batchId = rows.find((r) => r.batch_id)?.batch_id || null;
const source = rows.find((r) => r.source)?.source || null;
return {
job_id: key.startsWith("_orphan_") ? null : key,
started_at: startedAt,
completed_at: completedAt,
install_id: tx?.install_id || rows[0].install_id || null,
tier: tx?.tier || rows[0].tier || null,
media_url: tx?.media_url || null,
title: tx?.title || null,
batch_id: batchId,
source: source,
audio_seconds: audioSec,
audio_bytes: audioBytes,
transcribe_status: txStatus,
transcribe_backend: tx?.backend || null,
transcribe_model: tx?.model || null,
// transcribe_ms = outer wall-time of the whole TX phase (the
// value the operator perceives as "how long did transcribe
// take"). transcribe_ms_sum = total backend compute across all
// chunks (drives cost; equals N × wall when N chunks run truly
// sequentially, equals wall when single-chunk). For Gemini at
// concurrency 12 over 3 chunks: wall ≈ 60s, sum ≈ 180s.
transcribe_ms: txMs,
transcribe_ms_sum: txMsSum,
download_ms: downloadMs,
chunk_count: tx?.chunk_count ?? null,
analyze_windows_total: analyzeRows.length,
analyze_windows_success: analyzeSuccess.length,
analyze_windows_failed: analyzeFailed.length,
analyze_backend: analyzeDominantBackend,
analyze_model: analyzeDominantModel,
// analyze_ms = SUM of per-window durations (total backend compute,
// useful for cost). analyze_wall_time_ms = ELAPSED time from
// first window start to last window end (the time a user actually
// waits for the analyze phase). The two diverge when N windows
// run in parallel: a 10-window 100s-per-window job has analyze_ms
// = 1000s but analyze_wall_time_ms ≈ 100s (single batch) or
// ≈ 200s (two sequential batches at concurrency 5).
analyze_ms: analyzeMs,
analyze_wall_time_ms: analyzeWallMs,
overall_status: overall,
wall_time_ms: completedAt - startedAt,
cost_usd: cost,
errors,
// Derived rate metrics:
transcribe_ms_per_min: audioMinutes && txMs ? txMs / audioMinutes : null,
transcribe_ms_per_mb: audioMb && txMs ? txMs / audioMb : null,
download_ms_per_mb: audioMb && downloadMs ? downloadMs / audioMb : null,
analyze_ms_per_min: audioMinutes && analyzeMs ? analyzeMs / audioMinutes : null,
analyze_wall_ms_per_min: audioMinutes && analyzeWallMs ? analyzeWallMs / audioMinutes : null,
analyze_ms_per_mb: audioMb && analyzeMs ? analyzeMs / audioMb : null,
};
}
// Pick the most frequent string in a list (ties broken by first
// occurrence). Used to attribute a backend/model to a job when its
// rows might disagree (e.g. some analyze windows hit gemini and
// fallback chain walked to hardware on others).
function dominant(values) {
const counts = new Map();
for (const v of values) {
if (!v) continue;
counts.set(v, (counts.get(v) || 0) + 1);
}
let best = null;
let bestCount = 0;
for (const [v, c] of counts) {
if (c > bestCount) {
best = v;
bestCount = c;
}
}
return best;
}
// Compute summary statistics across all aggregated jobs. Returned to
// the dashboard's top-of-page cards: success rate, total processing
// time, average wall-time per video, etc.
export function summarizeJobs(jobs) {
const total = jobs.length;
if (total === 0) {
return {
total: 0,
success: 0,
partial: 0,
failed: 0,
success_rate: 1,
median_wall_time_ms: null,
median_transcribe_ms_per_min: null,
median_analyze_ms_per_min: null,
total_cost_usd: 0,
total_audio_hours: 0,
};
}
const success = jobs.filter((j) => j.overall_status === "success").length;
const partial = jobs.filter((j) => j.overall_status === "partial").length;
const failed = jobs.filter((j) => j.overall_status === "failed").length;
const totalCost = jobs.reduce((s, j) => s + (j.cost_usd || 0), 0);
const totalAudioSec = jobs.reduce(
(s, j) => s + (j.audio_seconds || 0),
0
);
return {
total,
success,
partial,
failed,
success_rate: (success + partial) / total,
median_wall_time_ms: median(jobs.map((j) => j.wall_time_ms).filter(Number.isFinite)),
median_transcribe_ms_per_min: median(
jobs.map((j) => j.transcribe_ms_per_min).filter(Number.isFinite)
),
median_analyze_ms_per_min: median(
jobs.map((j) => j.analyze_ms_per_min).filter(Number.isFinite)
),
total_cost_usd: totalCost,
total_audio_hours: totalAudioSec / 3600,
};
}
function median(arr) {
if (!arr.length) return null;
const sorted = [...arr].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 === 0
? (sorted[mid - 1] + sorted[mid]) / 2
: sorted[mid];
}