111 lines
3.7 KiB
JavaScript
111 lines
3.7 KiB
JavaScript
// Process-global FIFO queue for hardware-backed jobs.
|
|
//
|
|
// Why this exists: Spark Control + the operator's GPU box can really
|
|
// only handle one full pipeline at a time. Pre-queue, two concurrent
|
|
// /relay/v1/summarize-url requests would each spin up their own
|
|
// async worker — both would fire 2-parallel transcribe chunks + 2
|
|
// parallel diarize calls (and analyze on top), totalling 8+
|
|
// simultaneous calls into Spark Control. The GPU thrashes and
|
|
// either both jobs slow down massively or chunks start failing.
|
|
//
|
|
// The fix is a single-slot semaphore around any code path that
|
|
// drives the hardware backend. Gemini paths bypass entirely — the
|
|
// Google API handles arbitrary concurrency at its end and we have
|
|
// no local bottleneck. Intra-job concurrency (the operator's
|
|
// configured 2-parallel chunk worker) stays — the operator's GPU
|
|
// can handle 2 simultaneous transcribe POSTs per the SC dev's
|
|
// guidance; the queue serializes whole jobs, not individual calls
|
|
// within a job.
|
|
//
|
|
// Use:
|
|
// const release = await acquireHardwareSlot({
|
|
// jobId,
|
|
// onWait: (info) => { ... emit "queued" SSE event ... },
|
|
// });
|
|
// try {
|
|
// // ... do hardware-backed pipeline ...
|
|
// } finally {
|
|
// release();
|
|
// }
|
|
//
|
|
// onWait fires AT MOST ONCE, before this caller starts waiting.
|
|
// `info.position` is 1-indexed (1 = next up after the current
|
|
// active job, 2 = one slot back, etc). Callers use this to push
|
|
// a "queued" event to the SSE client so the UI can show "Queued
|
|
// — N jobs ahead of you".
|
|
|
|
let queueHead = Promise.resolve();
|
|
let pendingCount = 0; // includes the currently-active job
|
|
let currentJobId = null;
|
|
let queueLog = []; // bounded ring buffer for /admin/queue visibility
|
|
|
|
const MAX_LOG_ENTRIES = 50;
|
|
|
|
function logQueueEvent(event) {
|
|
queueLog.push({ at: Date.now(), ...event });
|
|
if (queueLog.length > MAX_LOG_ENTRIES) {
|
|
queueLog = queueLog.slice(-MAX_LOG_ENTRIES);
|
|
}
|
|
}
|
|
|
|
export async function acquireHardwareSlot({ jobId, onWait } = {}) {
|
|
pendingCount += 1;
|
|
const positionInQueue = pendingCount - 1; // 0 = active immediately, 1+ = waiting
|
|
const prev = queueHead;
|
|
let releaseFn;
|
|
const slot = new Promise((resolve) => {
|
|
releaseFn = resolve;
|
|
});
|
|
queueHead = prev.then(() => slot);
|
|
|
|
if (positionInQueue > 0) {
|
|
logQueueEvent({
|
|
kind: "enqueued",
|
|
jobId: jobId || null,
|
|
position: positionInQueue,
|
|
activeJobId: currentJobId,
|
|
});
|
|
if (typeof onWait === "function") {
|
|
try {
|
|
onWait({ position: positionInQueue, activeJobId: currentJobId });
|
|
} catch {}
|
|
}
|
|
console.log(
|
|
`[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} queued at position ${positionInQueue} (active: ${currentJobId ? currentJobId.slice(0, 8) : "none"})`
|
|
);
|
|
}
|
|
|
|
await prev;
|
|
currentJobId = jobId || null;
|
|
logQueueEvent({ kind: "started", jobId: jobId || null });
|
|
console.log(
|
|
`[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} started — ${pendingCount - 1} waiting`
|
|
);
|
|
|
|
let released = false;
|
|
return () => {
|
|
if (released) return;
|
|
released = true;
|
|
const wasActive = currentJobId;
|
|
currentJobId = null;
|
|
pendingCount -= 1;
|
|
logQueueEvent({ kind: "released", jobId: wasActive });
|
|
console.log(
|
|
`[hardware-queue] job ${wasActive ? wasActive.slice(0, 8) : "(no-id)"} released — ${pendingCount} still in queue`
|
|
);
|
|
releaseFn();
|
|
};
|
|
}
|
|
|
|
// Read-only snapshot for the operator dashboard. Returns:
|
|
// pendingCount: total jobs in the queue (active + waiting)
|
|
// currentJobId: the job currently holding the slot, or null
|
|
// log: recent queue events (most recent last; bounded to 50)
|
|
export function getHardwareQueueStatus() {
|
|
return {
|
|
pendingCount,
|
|
currentJobId,
|
|
log: queueLog.slice(),
|
|
};
|
|
}
|