Add internal-meetings pipeline and post-hoc speaker tools

This commit is contained in:
Keysat
2026-06-13 13:35:53 -05:00
parent 9a2dbf69df
commit 705807e286
15 changed files with 7375 additions and 0 deletions
+323
View File
@@ -0,0 +1,323 @@
// Per-job aggregation over the relay's audit log. The audit log
// records ONE row per relay call (transcribe or analyze); a single
// summary job typically produces 1 transcribe row + N analyze rows
// (one per chunked-analyze window). This module groups those rows by
// X-Recap-Job-Id and computes per-video stats the dashboard renders
// as a sortable / filterable table.
//
// The aggregation is computed on the fly from the in-memory entries
// array — no separate persistence. A typical 30-day window has a few
// thousand audit rows; grouping is O(n) and well under 10 ms.
//
// Output row shape (one per job_id, plus a synthetic row for
// orphaned entries with no job_id):
// {
// job_id: string | null
// started_at: ms-epoch (earliest ts across the job's rows)
// completed_at: ms-epoch (latest ts)
// install_id: short string
// tier: "core" | "pro" | "max" | null
// media_url: string | null // from the transcribe row
// title: string | null // ditto
// audio_seconds: number | null // from transcribe row
// audio_bytes: number | null // ditto (bytes downloaded
// by the relay for transcribe-url)
// transcribe_status: "success" | "error" | "refused" | "missing"
// transcribe_backend: "gemini" | "hardware" | null
// transcribe_model: string | null
// transcribe_ms: number | null
// download_ms: number | null
// chunk_count: number | null // transcribe-side audio chunks
// analyze_windows_total: number // count of analyze rows
// analyze_windows_success: number
// analyze_windows_failed: number
// analyze_backend: string | null // dominant backend across analyze rows
// analyze_model: string | null // dominant model across analyze rows
// analyze_ms: number // sum of analyze duration_ms
// overall_status: "success" | "partial" | "failed"
// wall_time_ms: completed_at - started_at
// cost_usd: number (sum across all rows)
// errors: string[] // concatenated short error strings
// // Derived metrics — pre-computed so the UI can sort by them:
// transcribe_ms_per_min: number | null // transcribe_ms / (audio_seconds/60)
// transcribe_ms_per_mb: number | null // transcribe_ms / (audio_bytes / 1MB)
// download_ms_per_mb: number | null
// analyze_ms_per_min: number | null
// analyze_ms_per_mb: number | null
// }
const MB = 1024 * 1024;
export function aggregateJobs(entries, opts = {}) {
// Group by job_id. Entries without job_id become singleton groups
// keyed by their ts so they still appear in the table (helpful for
// debugging orphan calls).
const groups = new Map();
for (const e of entries) {
const key = e.job_id || `_orphan_${e.ts}`;
if (!groups.has(key)) groups.set(key, []);
groups.get(key).push(e);
}
// Optional set of job_ids that have stored output JSONs — passed
// in from the route layer so the aggregator doesn't have to hit
// the filesystem itself. Used to set the has_output flag the
// dashboard reads to show/hide the "View" link.
const outputIdSet = opts.outputIdSet instanceof Set ? opts.outputIdSet : null;
const out = [];
for (const [key, rows] of groups) {
const row = aggregateOne(key, rows);
row.has_output = outputIdSet ? outputIdSet.has(row.job_id) : false;
out.push(row);
}
// Newest first by started_at.
out.sort((a, b) => b.started_at - a.started_at);
return out;
}
function aggregateOne(key, rows) {
rows.sort((a, b) => a.ts - b.ts);
// ts in each audit row is when recordCall() fired — i.e., when the
// work for that row COMPLETED, not when it started. To recover the
// user-POV "job start" timestamp we work backwards from the first
// row's end-time using its duration_ms AND download_ms fields.
//
// TX row layout:
// ts = download_end + tx_work_end
// duration_ms = tx_work_duration (NOT including download)
// download_ms = download_duration
//
// So: job_start = ts - duration_ms - download_ms.
//
// Without including download_ms here, wall_time misses the
// download phase (which can be 30-60s on a long YouTube fetch).
// Including it makes wall_time match the operator's intuitive
// formula: WALL ≈ DL + TX + AN_wall.
const firstRowDur = Number(rows[0].duration_ms) || 0;
const firstRowDownload = Number(rows[0].download_ms) || 0;
const startedAt = rows[0].ts - firstRowDur - firstRowDownload;
const completedAt = rows[rows.length - 1].ts;
const tx = rows.find((r) => r.pipeline === "transcribe");
const analyzeRows = rows.filter((r) => r.pipeline === "analyze");
const analyzeSuccess = analyzeRows.filter((r) => r.status === "success");
const analyzeFailed = analyzeRows.filter((r) => r.status !== "success");
const analyzeMs = analyzeRows.reduce(
(s, r) => s + (Number(r.duration_ms) || 0),
0
);
// Analyze wall time: elapsed clock time from when the FIRST window
// started to when the LAST window finished. For a 1-batch parallel
// analyze (all N windows fire concurrently), this ≈ the slowest
// single window's duration. For multi-batch (N > concurrency, e.g.
// 10 windows at concurrency 8 → 2 sequential batches), this spans
// both batches including any gap. Computed from end-ts minus
// start-ts (where start-ts = row.ts - row.duration_ms) so it's
// an accurate measured value, not a predicted one.
let analyzeWallMs = null;
if (analyzeRows.length > 0) {
let minStart = Infinity;
let maxEnd = -Infinity;
for (const r of analyzeRows) {
const end = Number(r.ts) || 0;
const dur = Number(r.duration_ms) || 0;
const start = end - dur;
if (start < minStart) minStart = start;
if (end > maxEnd) maxEnd = end;
}
analyzeWallMs = maxEnd - minStart;
}
const analyzeDominantBackend = dominant(
analyzeSuccess.map((r) => r.backend)
);
const analyzeDominantModel = dominant(analyzeSuccess.map((r) => r.model));
const errors = rows
.filter((r) => r.error)
.map((r) => `${r.pipeline}: ${String(r.error).slice(0, 160)}`);
const txStatus = tx ? tx.status : "missing";
let overall;
if (txStatus === "error" || txStatus === "refused" || txStatus === "missing") {
overall = "failed";
} else if (txStatus === "partial") {
// TX produced a truncated transcript (chunks hit the output-token
// cap). Mark the whole job partial regardless of analyze status —
// the analysis was performed against incomplete input, so even
// a "success" on analyze rows is misleading.
overall = "partial";
} else if (analyzeRows.length === 0) {
// Transcribe succeeded but no analyze rows — could be in flight,
// or the client never called /relay/analyze (uses local model).
overall = "success";
} else if (analyzeSuccess.length === analyzeRows.length) {
overall = "success";
} else if (analyzeSuccess.length > 0) {
overall = "partial";
} else {
overall = "failed";
}
const cost = rows.reduce((s, r) => s + (Number(r.cost_usd) || 0), 0);
// Use `??` (nullish-coalesce) — NOT `||` — so a legitimate 0 isn't
// treated as missing data. The test-run worker writes duration_ms=0
// historically (pre-fix) on cache-hit siblings; even though the new
// worker writes a non-zero shared wall-time, old audit rows from
// earlier benchmark batches still live in the NDJSON and we want
// those rendered correctly rather than collapsed to "—".
const audioSec = tx?.audio_seconds ?? null;
const audioBytes = tx?.audio_bytes ?? null;
const txMs = tx?.duration_ms ?? null;
const downloadMs = tx?.download_ms ?? null;
// TX backend compute time = sum of per-chunk wall-times. Distinct
// from txMs which is the outer parallel-fan-out wall-time.
// single-chunk: txMsSum ≈ txMs (one chunk, one duration)
// N-chunks at concurrency C: txMsSum ≈ N × per-chunk-duration
// txMs ≈ ⌈N/C⌉ × per-chunk-duration
// Falls back to txMs (the wall-time) when chunk_durations_ms is
// absent — old audit rows from before v0.2.41 don't have it.
const chunkDurationsArr = Array.isArray(tx?.chunk_durations_ms) ? tx.chunk_durations_ms : null;
const txMsSum = chunkDurationsArr
? chunkDurationsArr.reduce((s, d) => s + (Number(d) || 0), 0)
: txMs;
const audioMinutes = audioSec ? audioSec / 60 : null;
const audioMb = audioBytes ? audioBytes / MB : null;
// batch_id and source are stamped per audit row by the test-run
// path; use the first non-null we see so dashboard filters work
// regardless of which row gets read first in a multi-row job.
const batchId = rows.find((r) => r.batch_id)?.batch_id || null;
const source = rows.find((r) => r.source)?.source || null;
return {
job_id: key.startsWith("_orphan_") ? null : key,
started_at: startedAt,
completed_at: completedAt,
install_id: tx?.install_id || rows[0].install_id || null,
tier: tx?.tier || rows[0].tier || null,
media_url: tx?.media_url || null,
title: tx?.title || null,
batch_id: batchId,
source: source,
audio_seconds: audioSec,
audio_bytes: audioBytes,
transcribe_status: txStatus,
transcribe_backend: tx?.backend || null,
transcribe_model: tx?.model || null,
// transcribe_ms = outer wall-time of the whole TX phase (the
// value the operator perceives as "how long did transcribe
// take"). transcribe_ms_sum = total backend compute across all
// chunks (drives cost; equals N × wall when N chunks run truly
// sequentially, equals wall when single-chunk). For Gemini at
// concurrency 12 over 3 chunks: wall ≈ 60s, sum ≈ 180s.
transcribe_ms: txMs,
transcribe_ms_sum: txMsSum,
download_ms: downloadMs,
chunk_count: tx?.chunk_count ?? null,
analyze_windows_total: analyzeRows.length,
analyze_windows_success: analyzeSuccess.length,
analyze_windows_failed: analyzeFailed.length,
analyze_backend: analyzeDominantBackend,
analyze_model: analyzeDominantModel,
// analyze_ms = SUM of per-window durations (total backend compute,
// useful for cost). analyze_wall_time_ms = ELAPSED time from
// first window start to last window end (the time a user actually
// waits for the analyze phase). The two diverge when N windows
// run in parallel: a 10-window 100s-per-window job has analyze_ms
// = 1000s but analyze_wall_time_ms ≈ 100s (single batch) or
// ≈ 200s (two sequential batches at concurrency 5).
analyze_ms: analyzeMs,
analyze_wall_time_ms: analyzeWallMs,
overall_status: overall,
wall_time_ms: completedAt - startedAt,
cost_usd: cost,
errors,
// Derived rate metrics:
transcribe_ms_per_min: audioMinutes && txMs ? txMs / audioMinutes : null,
transcribe_ms_per_mb: audioMb && txMs ? txMs / audioMb : null,
download_ms_per_mb: audioMb && downloadMs ? downloadMs / audioMb : null,
analyze_ms_per_min: audioMinutes && analyzeMs ? analyzeMs / audioMinutes : null,
analyze_wall_ms_per_min: audioMinutes && analyzeWallMs ? analyzeWallMs / audioMinutes : null,
analyze_ms_per_mb: audioMb && analyzeMs ? analyzeMs / audioMb : null,
};
}
// Pick the most frequent string in a list (ties broken by first
// occurrence). Used to attribute a backend/model to a job when its
// rows might disagree (e.g. some analyze windows hit gemini and
// fallback chain walked to hardware on others).
function dominant(values) {
const counts = new Map();
for (const v of values) {
if (!v) continue;
counts.set(v, (counts.get(v) || 0) + 1);
}
let best = null;
let bestCount = 0;
for (const [v, c] of counts) {
if (c > bestCount) {
best = v;
bestCount = c;
}
}
return best;
}
// Compute summary statistics across all aggregated jobs. Returned to
// the dashboard's top-of-page cards: success rate, total processing
// time, average wall-time per video, etc.
export function summarizeJobs(jobs) {
const total = jobs.length;
if (total === 0) {
return {
total: 0,
success: 0,
partial: 0,
failed: 0,
success_rate: 1,
median_wall_time_ms: null,
median_transcribe_ms_per_min: null,
median_analyze_ms_per_min: null,
total_cost_usd: 0,
total_audio_hours: 0,
};
}
const success = jobs.filter((j) => j.overall_status === "success").length;
const partial = jobs.filter((j) => j.overall_status === "partial").length;
const failed = jobs.filter((j) => j.overall_status === "failed").length;
const totalCost = jobs.reduce((s, j) => s + (j.cost_usd || 0), 0);
const totalAudioSec = jobs.reduce(
(s, j) => s + (j.audio_seconds || 0),
0
);
return {
total,
success,
partial,
failed,
success_rate: (success + partial) / total,
median_wall_time_ms: median(jobs.map((j) => j.wall_time_ms).filter(Number.isFinite)),
median_transcribe_ms_per_min: median(
jobs.map((j) => j.transcribe_ms_per_min).filter(Number.isFinite)
),
median_analyze_ms_per_min: median(
jobs.map((j) => j.analyze_ms_per_min).filter(Number.isFinite)
),
total_cost_usd: totalCost,
total_audio_hours: totalAudioSec / 3600,
};
}
function median(arr) {
if (!arr.length) return null;
const sorted = [...arr].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 === 0
? (sorted[mid - 1] + sorted[mid]) / 2
: sorted[mid];
}