Files

903 lines
36 KiB
JavaScript

// POST /admin/test-run — operator-side benchmarking flow.
//
// Same end-to-end pipeline as /relay/transcribe-url, but with two
// key differences:
// 1. The operator can OVERRIDE backend + model per call, bypassing
// planBackend's tier/preference logic. Used by the dashboard's
// benchmark suite to test specific permutations.
// 2. The audit row is tagged with batch_id + source="admin-test"
// so test runs are clearly distinguishable from real user
// traffic in the Jobs tab (and filterable / hideable from view).
//
// Request body (admin-auth-gated by virtue of being under /admin/*):
// {
// media_url: string, required
// type?: "youtube" | "podcast"
// title?: string
// transcribe_backend: "gemini" | "hardware", required
// transcribe_model?: string (gemini model id; ignored when hardware)
// analyze_backend: "gemini" | "hardware", required
// analyze_model?: string
// batch_id?: string — groups multiple test runs into one suite
// }
//
// Response (immediate; job runs in background):
// { result: { job_id, status: "queued", batch_id } }
// Poll GET /admin/jobs/:id (existing) for status; final transcript +
// analyze result lands in the Jobs table once complete.
import express from "express";
import fs from "fs/promises";
import os from "os";
import path from "path";
import { randomUUID } from "crypto";
import { getConfigSnapshot } from "../config.js";
import { createGeminiBackend } from "../backends/gemini.js";
import { createHardwareBackend } from "../backends/hardware.js";
import { resolveHardwareConfig } from "../hardware-config.js";
import { recordCall } from "../audit-log.js";
import { calcGeminiCost } from "../pricing.js";
import { getAudioDurationSeconds } from "../audio-meta.js";
import {
createJob,
markRunning,
setProgress,
markComplete,
markFailed,
} from "../jobs.js";
import {
looksLikeYouTube,
downloadDirect,
downloadYouTube,
} from "./transcribe-url.js";
import { fetchYouTubeCaptions } from "../youtube-captions.js";
import { saveJobOutput } from "../output-store.js";
import { runChunkedAnalysis } from "../chunked-analyze.js";
// Synthetic install_id used for all test-run audit rows. Keeps them
// out of any real-user aggregations + makes them filterable in the
// Jobs tab via the existing install-id filter.
const TEST_INSTALL_ID = "admin-test";
// ── TX-sharing cache ────────────────────────────────────────────
// The benchmark suite has paired permutations that use the SAME
// transcribe config but differ in their analyze backend:
// pair 1+6: TX = gemini-3.1-flash-lite (then AN gemini vs hardware)
// pair 4+5: TX = hardware (then AN hardware vs gemini)
// pair 7+8: TX = captions (then AN gemini vs hardware)
// Without sharing, running both members of a pair re-transcribes
// the same audio twice — wasteful (cost + wall time).
//
// Implementation: an in-memory Map keyed on (mediaUrl, txConfig)
// whose values are PROMISES for the transcript. The first request
// in the pair inserts a pending Promise; subsequent requests with
// the same key await that Promise. Completed entries linger in the
// cache for ~10 minutes so a "rerun last" benchmark within that
// window also dedupes. Cache entries auto-expire to bound memory.
//
// The cache is process-local (single relay process); a relay
// restart clears it. That's fine — benchmark suites are operator-
// initiated and short-lived.
const TX_CACHE_TTL_MS = 10 * 60 * 1000;
const txCache = new Map(); // key → { promise, expiresAt }
function txCacheKey({ mediaUrl, captionsMode, txBackend, txModel }) {
if (captionsMode === "use") return `captions:${mediaUrl}`;
return `tx:${txBackend}:${txModel || "(default)"}:${mediaUrl}`;
}
function getOrComputeTx(key, computeFn) {
const now = Date.now();
// Evict expired entries opportunistically.
for (const [k, v] of txCache) {
if (v.expiresAt < now) txCache.delete(k);
}
const existing = txCache.get(key);
if (existing && existing.expiresAt > now) {
return {
promise: existing.promise,
cached: true,
startedAt: existing.startedAt,
};
}
const startedAt = Date.now();
const promise = computeFn();
txCache.set(key, { promise, expiresAt: now + TX_CACHE_TTL_MS, startedAt });
// If the compute fails, evict the entry so the next attempt
// gets a fresh try (don't cache failures).
promise.catch(() => txCache.delete(key));
return { promise, cached: false, startedAt };
}
// Strip code fences + parse a JSON-formatted analyze response into
// the { sections: [...] } shape Recap's render expects. Returns
// null on parse failure so the saved output can store the raw text
// for forensic review.
function safeParseSections(text) {
if (!text || typeof text !== "string") return null;
let jsonStr = text.trim();
const cb = jsonStr.match(/```(?:json)?\s*([\s\S]*?)```/);
if (cb) jsonStr = cb[1].trim();
try {
const parsed = JSON.parse(jsonStr);
return parsed && Array.isArray(parsed.sections) ? parsed : null;
} catch {
return null;
}
}
export function adminTestRunRouter() {
const router = express.Router();
router.post("/test-run", express.json({ limit: "1mb" }), async (req, res) => {
const {
media_url: mediaUrl,
type,
title,
transcribe_backend: txBackend,
transcribe_model: txModel,
analyze_backend: anBackend,
analyze_model: anModel,
batch_id: batchId,
// When captions_mode === "use", the relay fetches YouTube
// captions via yt-dlp instead of downloading+transcribing the
// audio. Transcribe-backend/model are ignored in that case;
// the captions text feeds straight into analyze. Only works
// for YouTube URLs (no captions for podcast .mp3 enclosures).
captions_mode: captionsMode,
} = req.body || {};
if (!mediaUrl || typeof mediaUrl !== "string") {
return res.status(400).json({ error: "missing or non-string media_url" });
}
const useCaptions = captionsMode === "use";
if (!useCaptions && !["gemini", "hardware"].includes(txBackend)) {
return res.status(400).json({ error: "transcribe_backend must be 'gemini' or 'hardware' (unless captions_mode='use')" });
}
if (!["gemini", "hardware"].includes(anBackend)) {
return res.status(400).json({ error: "analyze_backend must be 'gemini' or 'hardware'" });
}
const effectiveBatchId = batchId || randomUUID();
const job = createJob({
kind: "admin-test-run",
installId: TEST_INSTALL_ID,
metadata: {
media_url: mediaUrl,
title,
transcribe_backend: useCaptions ? "captions" : txBackend,
analyze_backend: anBackend,
batch_id: effectiveBatchId,
captions_mode: captionsMode || null,
},
});
// Hand back the job_id immediately; the dashboard polls for status.
res.json({
result: {
job_id: job.id,
status: "queued",
batch_id: effectiveBatchId,
kind: "admin-test-run",
},
});
// Bundle all the worker-input fields into a ctx object so the
// worker can be invoked from BOTH this single-perm endpoint AND
// the /test-run-suite endpoint (which mints jobs upfront then
// fires the same worker per phase).
const ctx = {
mediaUrl, type, title,
txBackend, txModel, anBackend, anModel,
batchId: effectiveBatchId, captionsMode, useCaptions,
};
executeTestRunWorker(job, ctx).catch((err) => {
markFailed(job.id, "worker_crashed: " + (err?.message || String(err)));
console.error(`[admin/test-run ${job.id.slice(0, 8)}] worker crashed:`, err);
});
});
// ── POST /admin/test-run-suite ──────────────────────────────
// Server-side benchmark runner. Accepts an ARRAY of permutations,
// mints jobs for all of them upfront (so the client can show the
// table immediately), and runs the phase-based concurrent
// execution server-side. Key property: the suite KEEPS RUNNING
// even if the operator's browser closes / phone sleeps / tab
// refreshes — the work is in a background loop on the relay
// process, not in the dashboard's JavaScript.
//
// Phases are grouped by TX fingerprint so paired permutations
// (1+6, 4+5, 7+8) fire concurrently and share TX via the existing
// in-memory inflight-promise cache.
router.post("/test-run-suite", express.json({ limit: "10mb" }), async (req, res) => {
const { media_url: mediaUrl, permutations } = req.body || {};
if (!mediaUrl || typeof mediaUrl !== "string") {
return res.status(400).json({ error: "missing or non-string media_url" });
}
if (!Array.isArray(permutations) || permutations.length === 0) {
return res.status(400).json({ error: "permutations must be a non-empty array" });
}
const batchId = randomUUID();
const items = [];
for (let i = 0; i < permutations.length; i++) {
const p = permutations[i] || {};
const ctx = {
mediaUrl,
type: p.type,
title: p.title || `permutation ${i + 1}`,
txBackend: p.transcribe_backend,
txModel: p.transcribe_model,
anBackend: p.analyze_backend,
anModel: p.analyze_model,
batchId,
captionsMode: p.captions_mode,
useCaptions: p.captions_mode === "use",
};
// Validate per-perm — partial failures shouldn't poison the
// whole batch; mark them so the worker can record the error.
if (!ctx.useCaptions && !["gemini", "hardware"].includes(ctx.txBackend)) {
ctx._validationError = `permutation ${i + 1}: transcribe_backend must be 'gemini' or 'hardware'`;
} else if (!["gemini", "hardware"].includes(ctx.anBackend)) {
ctx._validationError = `permutation ${i + 1}: analyze_backend must be 'gemini' or 'hardware'`;
}
const job = createJob({
kind: "admin-test-run",
installId: TEST_INSTALL_ID,
metadata: {
media_url: ctx.mediaUrl,
title: ctx.title,
transcribe_backend: ctx.useCaptions ? "captions" : ctx.txBackend,
analyze_backend: ctx.anBackend,
batch_id: batchId,
captions_mode: ctx.captionsMode || null,
suite_position: i + 1,
},
});
items.push({ job, ctx });
}
// Respond immediately with the planned IDs so the dashboard can
// start polling /admin/jobs-history?batch_id=<batchId> without
// blocking on the actual work.
res.json({
result: {
batch_id: batchId,
status: "queued",
job_ids: items.map((it) => it.job.id),
total: items.length,
kind: "admin-test-run-suite",
},
});
// ── Background phase runner ──
// Group items by TX fingerprint into phases. Permutations within
// a phase fire concurrently (their underlying TX dedupes via the
// cache); phases themselves run sequentially so we don't overload
// the transcribe backends. Failures don't abort the suite.
setImmediate(async () => {
try {
const phases = groupItemsByTxFingerprint(items);
console.log(
`[admin/test-run-suite] batch=${batchId.slice(0, 8)} ${items.length} perms in ${phases.length} phases`
);
for (let pi = 0; pi < phases.length; pi++) {
const phase = phases[pi];
console.log(
`[admin/test-run-suite] batch=${batchId.slice(0, 8)} phase ${pi + 1}/${phases.length}: firing ${phase.length} perm${phase.length === 1 ? "" : "s"}`
);
await Promise.allSettled(
phase.map(async (item) => {
if (item.ctx._validationError) {
markFailed(item.job.id, item.ctx._validationError);
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: null,
model: null,
status: "error",
duration_ms: 0,
cost_usd: 0,
job_id: item.job.id,
batch_id: batchId,
source: "admin-test",
media_url: item.ctx.mediaUrl,
title: item.ctx.title,
error: item.ctx._validationError,
});
return;
}
try {
await executeTestRunWorker(item.job, item.ctx);
} catch (err) {
markFailed(item.job.id, "worker_crashed: " + (err?.message || String(err)));
console.error(
`[admin/test-run-suite ${item.job.id.slice(0, 8)}] worker crashed:`,
err
);
}
})
);
}
console.log(`[admin/test-run-suite] batch=${batchId.slice(0, 8)} complete`);
} catch (err) {
console.error(`[admin/test-run-suite] batch=${batchId.slice(0, 8)} runner crashed:`, err);
}
});
});
return router;
}
// Group { job, ctx } items by their TX fingerprint into phases.
// Items with the same fingerprint share a phase so they hit the
// TX-share cache. Phase order is preserved from the input array
// (first appearance of a fingerprint wins).
function groupItemsByTxFingerprint(items) {
const phases = [];
const seen = new Map();
for (const item of items) {
const fp = item.ctx.useCaptions
? `captions:${item.ctx.mediaUrl}`
: `tx:${item.ctx.txBackend}:${item.ctx.txModel || ""}:${item.ctx.mediaUrl}`;
if (seen.has(fp)) {
phases[seen.get(fp)].push(item);
} else {
seen.set(fp, phases.length);
phases.push([item]);
}
}
return phases;
}
// Extracted worker — runs the full download / transcribe / analyze
// pipeline for one permutation. Used by both /admin/test-run (one
// permutation) and /admin/test-run-suite (many permutations
// orchestrated server-side in phases). Body is the same flow the
// inline IIFE used previously; ctx replaces what were closure refs.
async function executeTestRunWorker(job, ctx) {
const {
mediaUrl, type, title,
txBackend, txModel, anBackend, anModel,
batchId: effectiveBatchId,
captionsMode, useCaptions,
} = ctx;
// The legacy body of the IIFE follows verbatim (with `job` already
// passed in, and the closure vars now destructured from ctx).
{
const workerT0 = Date.now();
markRunning(job.id);
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "admin-tr-"));
const isYT = type === "youtube" || (!type && looksLikeYouTube(mediaUrl));
// ── Captions fast-path branch ──
// For YouTube URLs with captions_mode="use", fetch caption track
// via yt-dlp and skip audio download + Gemini transcribe
// entirely. The captions text feeds straight into analyze.
if (useCaptions) {
if (!isYT) {
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: "captions",
model: null,
status: "error",
duration_ms: 0,
cost_usd: 0,
job_id: job.id,
batch_id: effectiveBatchId,
source: "admin-test",
media_url: mediaUrl,
title: title || null,
error: "captions_mode='use' requires a YouTube URL (no captions for podcast audio)",
});
markFailed(job.id, "captions_mode requires YouTube URL");
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
return;
}
setProgress(job.id, "fetching captions…");
const capStart = Date.now();
let cap;
let capFromCache = false;
let capSharedStartedAt = capStart;
const capKey = txCacheKey({ mediaUrl, captionsMode: "use" });
try {
const { promise, cached, startedAt: sharedStartedAt } = getOrComputeTx(capKey, () =>
fetchYouTubeCaptions({ url: mediaUrl, tmpDir })
);
capFromCache = cached;
capSharedStartedAt = sharedStartedAt || capStart;
if (cached) setProgress(job.id, "reusing shared captions from paired permutation…");
cap = await promise;
} catch (err) {
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: "captions",
model: null,
status: "error",
duration_ms: Date.now() - capStart,
audio_seconds: null,
cost_usd: 0,
job_id: job.id,
batch_id: effectiveBatchId,
source: "admin-test",
media_url: mediaUrl,
title: title || null,
error: (err?.message || String(err)).slice(0, 300),
});
markFailed(job.id, "captions_fetch_failed: " + (err?.message || err));
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
return;
}
// Record the captions "transcribe" row. backend="captions"
// so the dashboard can filter / display it distinctly.
// When this permutation reused a paired sibling's captions
// fetch, the wall-time we attribute is the underlying fetch's
// wall-time (from the cache entry's startedAt) — so the
// dashboard's per-row TX-rate columns show real numbers on
// BOTH paired rows, not "—" on the sibling. The `source`
// flag "admin-test-shared-tx" lets aggregate analytics dedupe.
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: "captions",
model: cap.captions_source === "auto" ? "youtube-auto" : "youtube-manual",
status: "success",
duration_ms: Date.now() - capSharedStartedAt,
audio_seconds: cap.duration_seconds || null,
audio_bytes: null, // no audio downloaded
download_ms: null, // n/a
chunk_count: 1,
cost_usd: 0,
job_id: job.id,
batch_id: effectiveBatchId,
source: capFromCache ? "admin-test-shared-tx" : "admin-test",
media_url: mediaUrl,
title: title || null,
});
setProgress(job.id, "analyzing topics…");
const cfg2 = await getConfigSnapshot();
const hw2 = await resolveHardwareConfig(cfg2);
let anResultForCaptions = null;
try {
anResultForCaptions = await runAnalyzeForTestRun({
transcriptText: cap.text || "",
anBackend,
anModel,
cfg: cfg2,
hw: hw2,
jobId: job.id,
batchId: effectiveBatchId,
mediaUrl,
title,
audioSeconds: cap.duration_seconds || null,
audioBytes: null,
});
} catch (err) {
console.warn(`[admin/test-run ${job.id.slice(0, 8)}] analyze failed (captions): ${err?.message || err}`);
}
// Save output (test-runs always persist regardless of the
// save-user-outputs flag).
await saveJobOutput(job.id, {
batch_id: effectiveBatchId,
source: "admin-test",
transcript: cap.text || "",
analysis: anResultForCaptions ? safeParseSections(anResultForCaptions.text) : null,
analysis_raw_text: anResultForCaptions?.text || null,
meta: {
title: title || null,
media_url: mediaUrl,
audio_seconds: cap.duration_seconds || null,
audio_bytes: null,
captions_mode: "use",
captions_source: cap.captions_source || null,
transcribe_backend: "captions",
transcribe_model: cap.captions_source === "auto" ? "youtube-auto" : "youtube-manual",
analyze_backend: anBackend,
analyze_model: anResultForCaptions?.model || null,
},
});
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
markComplete(job.id, {
result: { transcribe_model: "captions", batch_id: effectiveBatchId },
});
return;
}
// ── Audio download path (no captions) ──
setProgress(job.id, "downloading media…");
let audio;
let downloadMs = 0;
try {
const dlStart = Date.now();
audio = isYT
? await downloadYouTube(mediaUrl, tmpDir)
: await downloadDirect(mediaUrl, tmpDir);
downloadMs = Date.now() - dlStart;
audio.seconds = await getAudioDurationSeconds(audio.filePath);
setProgress(job.id, `transcribing ${Math.round((audio.seconds || 0) / 60)} min audio…`);
} catch (err) {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
const msg = (err?.message || String(err)).slice(0, 300);
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: txBackend,
model: null,
status: "error",
credit_charged: 0,
duration_ms: Date.now() - workerT0,
download_ms: Date.now() - workerT0,
audio_seconds: null,
cost_usd: 0,
job_id: job.id,
batch_id: effectiveBatchId,
source: "admin-test",
media_url: mediaUrl,
title: title || null,
error: "download_failed: " + msg,
});
markFailed(job.id, "download_failed: " + msg);
return;
}
// ── Transcription with the operator's chosen backend ──
// Uses the TX-sharing cache so that paired benchmark
// permutations (e.g. 1+6 both transcribe with gemini-3.1-flash-
// lite) only invoke the underlying backend ONCE — the second
// permutation awaits the first's in-flight promise and reuses
// its transcript. Cache entries linger ~10 min so a fast
// "Rerun last" also dedupes.
const cfg = await getConfigSnapshot();
const hw = await resolveHardwareConfig(cfg);
let txResult;
let txFromCache = false;
const txStartedAt = Date.now();
let txSharedStartedAt = txStartedAt;
const cacheKey = txCacheKey({ mediaUrl, captionsMode: null, txBackend, txModel });
try {
const audioBuf = await fs.readFile(audio.filePath);
const { promise, cached, startedAt: sharedStartedAt } = getOrComputeTx(cacheKey, async () => {
if (txBackend === "gemini") {
const backend = createGeminiBackend({
apiKey: cfg.relay_gemini_api_key,
transcriptionModel: txModel || cfg.relay_gemini_transcription_model,
analysisModel: cfg.relay_gemini_analysis_model,
txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60,
txConcurrency: cfg.relay_gemini_tx_concurrency || 12,
transcribePromptOverride: cfg.relay_transcribe_prompt || "",
});
return await backend.transcribeAudio({
audio: audioBuf,
mimeType: audio.mimeType || "audio/mpeg",
title: title || "",
offsetSeconds: 0,
});
}
if (!hw.transcribe.url) {
throw new Error("hardware transcribe URL not configured");
}
const backend = createHardwareBackend({
parakeetBaseURL: hw.transcribe.url,
gemmaBaseURL: hw.analyze.url || "",
sparkControlBaseURL: hw.sparkBase || "",
parakeetModel: hw.transcribe.model || "",
gemmaModel: hw.analyze.model || "",
txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60,
txChunkOverlapSeconds: cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30,
diarizationEnabled: !!cfg.relay_hardware_diarization_enabled,
clusterThresholdPct: cfg.relay_hardware_voice_clustering_threshold ?? 70,
anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30,
smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15,
uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10,
anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30,
smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15,
uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10,
txConcurrency: cfg.relay_hardware_tx_concurrency || 4,
});
return await backend.transcribeAudio({
audio: audioBuf,
mimeType: audio.mimeType || "audio/mpeg",
offsetSeconds: 0,
});
});
txFromCache = cached;
txSharedStartedAt = sharedStartedAt || txStartedAt;
if (cached) {
setProgress(job.id, "reusing shared TX from paired permutation…");
}
txResult = await promise;
} catch (err) {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
const msg = (err?.message || String(err)).slice(0, 400);
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: txBackend,
model: txBackend === "gemini" ? (txModel || cfg.relay_gemini_transcription_model) : (hw.transcribe.model || "(auto)"),
status: "error",
credit_charged: 0,
duration_ms: Date.now() - workerT0,
download_ms: downloadMs,
audio_seconds: audio?.seconds || null,
audio_bytes: audio?.bytes || null,
cost_usd: 0,
job_id: job.id,
batch_id: effectiveBatchId,
source: "admin-test",
media_url: mediaUrl,
title: title || null,
error: msg,
});
markFailed(job.id, "transcribe_failed: " + msg);
return;
}
// Audit the successful transcribe.
const txCostDetails =
txBackend === "gemini" && txResult.usage
? calcGeminiCost(txResult.model, txResult.usage)
: { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 };
// Truncation detection — same as the production routes. When
// any chunk emitted < 80% of its expected audio, mark the
// benchmark row partial so the operator doesn't compare a
// truncated TX run against a clean one.
const txTruncatedChunks = Array.isArray(txResult?.truncated_chunks)
? txResult.truncated_chunks
: [];
const txWasTruncated = txTruncatedChunks.length > 0;
const txTruncationError = txWasTruncated
? `transcribe: ${txTruncatedChunks.length} chunk(s) truncated — missing ~${txTruncatedChunks.reduce((s, c) => s + (c.missingSec || 0), 0)}s of speech (model: ${txResult.model || "unknown"})`
: null;
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "transcribe",
backend: txBackend,
model: txResult.model || null,
status: txWasTruncated ? "partial" : "success",
credit_charged: 0,
truncated_chunks: txWasTruncated ? txTruncatedChunks : null,
error: txTruncationError,
// When this permutation reused a paired sibling's TX, the
// attributed duration is the wall-time of the UNDERLYING TX
// (from when the originating permutation kicked it off until
// both siblings' awaits resolved) — so the per-row TX rate
// columns in the Jobs table show real numbers on BOTH paired
// rows, not "—" on the sibling. Cost is still zero on the
// sibling (only the originator pays). The "admin-test-shared-tx"
// source flag lets aggregate analytics dedupe across pairs.
duration_ms: Date.now() - txSharedStartedAt,
download_ms: downloadMs,
audio_bytes: audio.bytes,
audio_seconds: audio.seconds || null,
job_id: job.id,
batch_id: effectiveBatchId,
source: txFromCache ? "admin-test-shared-tx" : "admin-test",
media_url: mediaUrl,
title: title || null,
attempts: txResult.attempts || null,
chunk_count: txResult.chunk_count ?? null,
// Per-chunk wall-times (ms). Aggregator sums this into
// transcribe_ms_sum so the Jobs table shows BOTH wall-time
// (from duration_ms) and total backend compute (from sum).
chunk_durations_ms: txResult.chunk_durations_ms || null,
...(txFromCache
? { input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0 }
: txCostDetails),
});
// ── Analyze with the operator's chosen backend ──
// For benchmarking purposes we run the chunked-analyze flow
// directly here (mirroring Recap's behavior) so the per-window
// performance is captured in the Jobs table. We build a simple
// prompt from the transcript text.
setProgress(job.id, "analyzing topics…");
let anResult = null;
try {
anResult = await runAnalyzeForTestRun({
transcriptText: txResult.text || "",
anBackend,
anModel,
cfg,
hw,
jobId: job.id,
batchId: effectiveBatchId,
mediaUrl,
title,
audioSeconds: audio.seconds || null,
audioBytes: audio.bytes,
});
} catch (err) {
// Analyze failure is recorded (inside runAnalyzeForTestRun);
// we still mark the job complete since transcribe succeeded.
console.warn(`[admin/test-run ${job.id.slice(0, 8)}] analyze failed: ${err?.message || err}`);
}
// Save the transcript + analysis JSON to disk for the
// dashboard's "View output" feature. Test-run jobs always
// persist regardless of the save-user-outputs config flag.
await saveJobOutput(job.id, {
batch_id: effectiveBatchId,
source: "admin-test",
transcript: txResult.text || "",
analysis: anResult ? safeParseSections(anResult.text) : null,
analysis_raw_text: anResult?.text || null,
meta: {
title: title || null,
media_url: mediaUrl,
audio_seconds: audio.seconds || null,
audio_bytes: audio.bytes,
captions_mode: null,
transcribe_backend: txBackend,
transcribe_model: txResult.model || null,
analyze_backend: anBackend,
analyze_model: anResult?.model || null,
},
});
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
markComplete(job.id, {
result: {
transcribe_model: txResult.model,
batch_id: effectiveBatchId,
},
});
}
}
// Run chunked analyze over the just-transcribed text using the same
// windowing strategy Recap's client uses (~18 min window body, 2 min
// overlap, N windows in flight). Each window emits its own audit row
// via recordCall (handled inside runChunkedAnalysis), so the Jobs
// table sees:
// - analyze_windows_total = N
// - analyze_ms = sum of per-window duration_ms (total backend work)
// - wall_time_ms = elapsed from first window start → last window end
// (computed by job-stats.js from row timestamps)
// Per-window `audio_seconds` is the window body length (not total audio),
// so per-row rate columns (s/audio-min) divide by the right denominator.
async function runAnalyzeForTestRun({
transcriptText,
anBackend,
anModel,
cfg,
hw,
jobId,
batchId,
mediaUrl,
title,
audioSeconds, // unused — chunked-analyze uses per-window seconds
audioBytes, // unused
}) {
// Build the right backend, then hand to runChunkedAnalysis which
// handles per-window prompt building, parallelism, audit logging,
// and stitching. Construction errors (missing apiKey, missing
// hardware URL) are audited as a single failed analyze row so the
// Jobs table shows what happened — runChunkedAnalysis only writes
// rows once it has a backend to call.
let backend;
let resolvedModel;
let computeCostDetails;
try {
if (anBackend === "gemini") {
backend = createGeminiBackend({
apiKey: cfg.relay_gemini_api_key,
transcriptionModel: cfg.relay_gemini_transcription_model,
analysisModel: anModel || cfg.relay_gemini_analysis_model,
// tx knobs are unused on the analyze path but the factory
// accepts them anyway — pass for consistency.
txChunkSeconds: (cfg.relay_gemini_tx_chunk_minutes || 30) * 60,
txConcurrency: cfg.relay_gemini_tx_concurrency || 12,
});
resolvedModel = anModel || cfg.relay_gemini_analysis_model;
computeCostDetails = (model, usage) =>
usage ? calcGeminiCost(model, usage) : {
input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0,
};
} else {
if (!hw.analyze.url) {
throw new Error("hardware analyze URL not configured");
}
backend = createHardwareBackend({
parakeetBaseURL: hw.transcribe.url || "",
gemmaBaseURL: hw.analyze.url,
sparkControlBaseURL: hw.sparkBase || "",
parakeetModel: hw.transcribe.model || "",
gemmaModel: hw.analyze.model || "",
txChunkSeconds: (cfg.relay_hardware_tx_chunk_minutes || 5) * 60,
txChunkOverlapSeconds: cfg.relay_hardware_tx_chunk_overlap_seconds ?? 30,
diarizationEnabled: !!cfg.relay_hardware_diarization_enabled,
clusterThresholdPct: cfg.relay_hardware_voice_clustering_threshold ?? 70,
anchorMinSpeakingSec: cfg.relay_hardware_anchor_min_speaking_sec ?? 30,
smallClusterMaxSpeakingSec: cfg.relay_hardware_small_cluster_max_speaking_sec ?? 15,
uncertainMarginPct: cfg.relay_hardware_uncertain_margin_pct ?? 10,
txConcurrency: cfg.relay_hardware_tx_concurrency || 4,
});
resolvedModel = hw.analyze.model || null;
computeCostDetails = () => ({
input_tokens: 0, output_tokens: 0, thinking_tokens: 0, cost_usd: 0,
});
}
} catch (err) {
await recordCall({
install_id: TEST_INSTALL_ID,
tier: "core",
pipeline: "analyze",
backend: anBackend,
model: anBackend === "gemini"
? (anModel || cfg.relay_gemini_analysis_model)
: (hw.analyze.model || "(auto)"),
status: "error",
duration_ms: 0,
audio_seconds: 0,
cost_usd: 0,
job_id: jobId,
batch_id: batchId,
source: "admin-test",
media_url: mediaUrl,
title: title || null,
error: (err?.message || String(err)).slice(0, 400),
window_idx: 0,
window_count: 1,
});
throw err;
}
// Pull windowing tunables from config (Settings tab).
const bodyMin = anBackend === "gemini"
? (cfg.relay_gemini_analyze_window_minutes || 18)
: (cfg.relay_hardware_analyze_window_minutes || 18);
const overlapMin = anBackend === "gemini"
? (cfg.relay_gemini_analyze_overlap_minutes || 2)
: (cfg.relay_hardware_analyze_overlap_minutes || 2);
const concurrency = anBackend === "gemini"
? (cfg.relay_gemini_analyze_concurrency || 12)
: (cfg.relay_hardware_analyze_concurrency || 8);
const cutoffMin = cfg.relay_analyze_cutoff_minutes || 25;
const result = await runChunkedAnalysis({
transcriptText,
backend,
pipelineBackend: anBackend,
jobId,
batchId,
mediaUrl,
title,
installId: TEST_INSTALL_ID,
source: "admin-test",
computeCostDetails,
bodySeconds: bodyMin * 60,
overlapSeconds: overlapMin * 60,
concurrency,
cutoffSeconds: cutoffMin * 60,
analyzePromptOverride: cfg.relay_analyze_prompt || "",
// Section-count target wiring (matches the summarize-url path).
// Without these, buildWindowPrompt falls back to "1 section" —
// works defensively but means test-run benchmarks don't reflect
// production segmentation density.
totalAudioSec: audioSeconds || 0,
targetTotalsByBucket: {
under_30: cfg.relay_analyze_total_sections_under_30,
"30_60": cfg.relay_analyze_total_sections_30_60,
"60_90": cfg.relay_analyze_total_sections_60_90,
"90_120": cfg.relay_analyze_total_sections_90_120,
"120_150": cfg.relay_analyze_total_sections_120_150,
"150_180": cfg.relay_analyze_total_sections_150_180,
over_180: cfg.relay_analyze_total_sections_over_180,
},
});
return {
text: result.text || "",
model: result.model || resolvedModel,
attempts: result.attempts,
};
}